summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-14 02:28:33 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-05-14 02:28:33 +0200
commit189001983350cbcc7f3524ea5983df48c873709b (patch)
tree4eb033f13a599e5ebd77705ba9fc1732bad1f13f /opendc
parent7958af9966cb3ea3237aabca9e9409041c6dbfbb (diff)
feat: Persist provisioner events
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt104
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt52
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt6
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt23
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt38
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt6
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt6
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt18
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt23
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt42
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt5
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt55
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt6
14 files changed, 298 insertions, 88 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 3603ae69..c25834a7 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,6 +1,7 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.odcsim.SimulationContext
+import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
@@ -19,6 +20,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@@ -57,11 +59,11 @@ class SimpleVirtProvisioningService(
*/
private val activeImages: MutableSet<ImageView> = mutableSetOf()
- public var submittedVms = 0L
- public var queuedVms = 0L
- public var runningVms = 0L
- public var finishedVms = 0L
- public var unscheduledVms = 0L
+ public var submittedVms = 0
+ public var queuedVms = 0
+ public var runningVms = 0
+ public var finishedVms = 0
+ public var unscheduledVms = 0
private var maxCores = 0
private var maxMemory = 0L
@@ -71,6 +73,13 @@ class SimpleVirtProvisioningService(
*/
private val allocationLogic = allocationPolicy()
+ /**
+ * The [EventFlow] to emit the events.
+ */
+ internal val eventFlow = EventFlow<VirtProvisioningEvent>()
+
+ override val events: Flow<VirtProvisioningEvent> = eventFlow
+
init {
launch {
val provisionedNodes = provisioningService.nodes()
@@ -96,8 +105,17 @@ class SimpleVirtProvisioningService(
image: Image,
flavor: Flavor
): Server = withContext(coroutineContext) {
- submittedVms++
- queuedVms++
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ ++submittedVms,
+ runningVms,
+ finishedVms,
+ ++queuedVms,
+ unscheduledVms
+ ))
+
suspendCancellableCoroutine<Server> { cont ->
val vmInstance = ImageView(name, image, flavor, cont)
incomingImages += vmInstance
@@ -141,7 +159,17 @@ class SimpleVirtProvisioningService(
if (selectedHv == null) {
if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
- unscheduledVms++
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ ++unscheduledVms
+ ))
+
incomingImages -= imageInstance
logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]")
@@ -168,8 +196,17 @@ class SimpleVirtProvisioningService(
)
imageInstance.server = server
imageInstance.continuation.resume(server)
- queuedVms--
- runningVms++
+
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ ))
activeImages += imageInstance
server.events
@@ -178,8 +215,17 @@ class SimpleVirtProvisioningService(
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
- runningVms--
- finishedVms++
+
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ --runningVms,
+ ++finishedVms,
+ queuedVms,
+ unscheduledVms
+ ))
activeImages -= imageInstance
selectedHv.provisionedCores -= server.flavor.cpuCount
@@ -211,6 +257,7 @@ class SimpleVirtProvisioningService(
if (server in hypervisors) {
// Corner case for when the hypervisor already exists
availableHypervisors += hypervisors.getValue(server)
+
} else {
val hv = HypervisorView(
server.uid,
@@ -223,11 +270,33 @@ class SimpleVirtProvisioningService(
maxMemory = max(maxMemory, server.flavor.memorySize)
hypervisors[server] = hv
}
+
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ ))
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val hv = hypervisors[server] ?: return
availableHypervisors -= hv
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ ))
+
if (incomingImages.isNotEmpty()) {
requestCycle()
}
@@ -242,6 +311,17 @@ class SimpleVirtProvisioningService(
hv.driver = server.services[VirtDriver]
availableHypervisors += hv
+ eventFlow.emit(VirtProvisioningEvent.MetricsAvailable(
+ this@SimpleVirtProvisioningService,
+ hypervisors.size,
+ availableHypervisors.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ ))
+
hv.driver.events
.onEach { event ->
if (event is HypervisorEvent.VmsUpdated) {
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt
new file mode 100644
index 00000000..39f75913
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt
@@ -0,0 +1,52 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.virt.service
+
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+
+
+/**
+ * An event that is emitted by the [VirtProvisioningService].
+ */
+public sealed class VirtProvisioningEvent {
+ /**
+ * The service that has emitted the event.
+ */
+ public abstract val provisioner: VirtProvisioningService
+
+ /**
+ * An event emitted for writing metrics.
+ */
+ data class MetricsAvailable(
+ override val provisioner: VirtProvisioningService,
+ public val totalHostCount: Int,
+ public val availableHostCount: Int,
+ public val totalVmCount: Int,
+ public val activeVmCount: Int,
+ public val inactiveVmCount: Int,
+ public val waitingVmCount: Int,
+ public val failedVmCount: Int
+ ) : VirtProvisioningEvent()
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 2ad7df84..c4cbd711 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -5,6 +5,7 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
+import kotlinx.coroutines.flow.Flow
/**
* A service for VM provisioning on a cloud.
@@ -16,6 +17,11 @@ interface VirtProvisioningService {
val allocationPolicy: AllocationPolicy
/**
+ * The events emitted by the service.
+ */
+ public val events: Flow<VirtProvisioningEvent>
+
+ /**
* Obtain the active hypervisors for this provisioner.
*/
public suspend fun drivers(): Set<VirtDriver>
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
index ac43b6ac..548400d6 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentHelpers.kt
@@ -36,6 +36,7 @@ import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
import com.atlarge.opendc.core.failure.FailureDomain
@@ -150,13 +151,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex
// Monitor hypervisor events
for (hypervisor in hypervisors) {
// TODO Do not expose VirtDriver directly but use Hypervisor class.
- reporter.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ reporter.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server)
hypervisor.server.events
.onEach { event ->
val time = clock.millis()
when (event) {
is ServerEvent.StateChanged -> {
- reporter.reportHostStateChange(time, hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ reporter.reportHostStateChange(time, hypervisor, event.server)
}
}
}
@@ -173,11 +174,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex
event.cpuUsage,
event.cpuDemand,
event.numberOfDeployedImages,
- event.hostServer,
- scheduler.submittedVms,
- scheduler.queuedVms,
- scheduler.runningVms,
- scheduler.finishedVms
+ event.hostServer
)
}
}
@@ -188,6 +185,16 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Ex
.onEach { reporter.reportPowerConsumption(hypervisor.server, it) }
.launchIn(domain)
}
+
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is VirtProvisioningEvent.MetricsAvailable ->
+ reporter.reportProvisionerMetrics(clock.millis(), event)
+ }
+
+ }
+ .launchIn(domain)
}
/**
@@ -197,7 +204,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
val domain = simulationContext.domain
try {
- var submitted = 0L
+ var submitted = 0
val finished = Channel<Unit>(Channel.CONFLATED)
val hypervisors = TreeSet(scheduler.drivers().map { (it as SimpleVirtDriver).server.name })
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
index 9455cb9d..7d65930c 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunner.kt
@@ -33,15 +33,11 @@ import com.atlarge.opendc.experiments.sc20.util.DatabaseHelper
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import kotlinx.coroutines.CoroutineDispatcher
-import kotlinx.coroutines.asCoroutineDispatcher
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.io.Closeable
import java.io.File
import java.util.concurrent.Executors
+import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicInteger
import javax.sql.DataSource
import kotlin.system.measureTimeMillis
@@ -159,11 +155,19 @@ public class ExperimentRunner(
val plan = createPlan()
val total = plan.size
val finished = AtomicInteger()
- val dispatcher = Executors.newWorkStealingPool(parallelism).asCoroutineDispatcher()
+ val executorService = Executors.newCachedThreadPool()
+ val planIterator = plan.iterator()
+ val futures = mutableListOf<Future<*>>()
- runBlocking {
- val mainDispatcher = coroutineContext[CoroutineDispatcher.Key]!!
- for (run in plan) {
+ while (planIterator.hasNext()) {
+ futures.clear()
+
+ repeat(parallelism) {
+ if (!planIterator.hasNext()) {
+ return@repeat
+ }
+
+ val run = planIterator.next()
val scenarioId = scenarioIds[run.scenario]!!
rawTraceReaders.computeIfAbsent(run.scenario.workload.name) { name ->
@@ -171,15 +175,14 @@ public class ExperimentRunner(
Sc20RawParquetTraceReader(File(tracePath, name))
}
- launch(dispatcher) {
- launch(mainDispatcher) {
+ val future = executorService.submit {
+ synchronized(helper) {
helper.startRun(scenarioId, run.id)
}
logger.info { "[${finished.get()}/$total] Starting run ($scenarioId, ${run.id})" }
try {
-
val duration = measureTimeMillis {
val reporter = reporterProvider.createReporter(scenarioIds[run.scenario]!!, run.id)
val traceReader = createTraceReader(run.scenario.workload.name, performanceInterferenceModel, run)
@@ -187,6 +190,7 @@ public class ExperimentRunner(
try {
run.scenario(run, reporter, environmentReader, traceReader)
+ logger.info { "Done" }
} finally {
reporter.close()
}
@@ -195,17 +199,23 @@ public class ExperimentRunner(
finished.incrementAndGet()
logger.info { "[${finished.get()}/$total] Finished run ($scenarioId, ${run.id}) in $duration milliseconds" }
- withContext(mainDispatcher) {
+ synchronized(helper) {
helper.finishRun(scenarioId, run.id, hasFailed = false)
}
} catch (e: Throwable) {
logger.error("A run has failed", e)
finished.incrementAndGet()
- withContext(mainDispatcher) {
+ synchronized(helper) {
helper.finishRun(scenarioId, run.id, hasFailed = true)
}
}
}
+
+ futures += future
+ }
+
+ for (future in futures) {
+ future.get()
}
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
index b072fb12..9cbfcdc1 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/ExperimentRunnerCli.kt
@@ -29,6 +29,7 @@ import com.atlarge.opendc.experiments.sc20.reporter.ExperimentPostgresReporter
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporter
import com.atlarge.opendc.experiments.sc20.reporter.ExperimentReporterProvider
import com.atlarge.opendc.experiments.sc20.reporter.PostgresHostMetricsWriter
+import com.atlarge.opendc.experiments.sc20.reporter.PostgresProvisionerMetricsWriter
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
import com.github.ajalt.clikt.core.CliktCommand
@@ -132,6 +133,7 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
override fun run() {
val ds = HikariDataSource()
+ ds.maximumPoolSize = Runtime.getRuntime().availableProcessors() * 3
ds.jdbcUrl = jdbcUrl
ds.addDataSourceProperty("reWriteBatchedInserts", "true")
@@ -178,10 +180,12 @@ internal sealed class Reporter(name: String) : OptionGroup(name), ExperimentRepo
override fun createReporter(scenario: Long, run: Int): ExperimentReporter {
val hostWriter = PostgresHostMetricsWriter(ds, batchSize)
- val delegate = ExperimentPostgresReporter(scenario, run, hostWriter)
+ val provisionerWriter = PostgresProvisionerMetricsWriter(ds, batchSize)
+ val delegate = ExperimentPostgresReporter(scenario, run, hostWriter, provisionerWriter)
return object : ExperimentReporter by delegate {
override fun close() {
hostWriter.close()
+ provisionerWriter.close()
}
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
index a4a42b6e..11b35bf9 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Portfolios.kt
@@ -69,10 +69,10 @@ object HorVerPortfolio : AbstractSc20Portfolio("horizontal_vs_vertical") {
)
override val workloads = listOf(
- // Workload("solvinity", 0.1),
- // Workload("solvinity", 0.25),
+ Workload("solvinity", 0.1),
+ Workload("solvinity", 0.25),
Workload("solvinity", 0.5),
- Workload("solvinity", 0.10)
+ Workload("solvinity", 1.0)
)
override val operationalPhenomena = listOf(
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt
index 1c752cb1..049035cc 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ExperimentReporter.kt
@@ -26,6 +26,7 @@ package com.atlarge.opendc.experiments.sc20.reporter
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
import java.io.Closeable
/**
@@ -43,17 +44,13 @@ interface ExperimentReporter : Closeable {
fun reportHostStateChange(
time: Long,
driver: VirtDriver,
- server: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long
+ server: Server
) {}
/**
* Report the power consumption of a host.
*/
- fun reportPowerConsumption(host: Server, draw: Double)
+ fun reportPowerConsumption(host: Server, draw: Double) {}
/**
* This method is invoked for a host for each slice that is finishes.
@@ -68,10 +65,11 @@ interface ExperimentReporter : Closeable {
cpuDemand: Double,
numberOfDeployedImages: Int,
hostServer: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long,
duration: Long = 5 * 60 * 1000L
) {}
+
+ /**
+ * This method is invoked for a provisioner event.
+ */
+ fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
index 58d384e7..87bf524f 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ParquetExperimentReporter.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.experiments.sc20.reporter
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.virt.driver.VirtDriver
@@ -90,11 +89,7 @@ class ExperimentParquetReporter(destination: File) :
override fun reportHostStateChange(
time: Long,
driver: VirtDriver,
- server: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long
+ server: Server
) {
logger.info("Host ${server.uid} changed state ${server.state} [$time]")
@@ -111,10 +106,6 @@ class ExperimentParquetReporter(destination: File) :
0.0,
0,
server,
- submittedVms,
- queuedVms,
- runningVms,
- finishedVms,
duration
)
@@ -141,10 +132,6 @@ class ExperimentParquetReporter(destination: File) :
cpuDemand: Double,
numberOfDeployedImages: Int,
hostServer: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long,
duration: Long
) {
val record = GenericData.Record(schema)
@@ -161,10 +148,10 @@ class ExperimentParquetReporter(destination: File) :
record.put("host_state", hostServer.state)
record.put("host_usage", cpuUsage)
record.put("power_draw", lastPowerConsumption[hostServer] ?: 200.0)
- record.put("total_submitted_vms", submittedVms)
- record.put("total_queued_vms", queuedVms)
- record.put("total_running_vms", runningVms)
- record.put("total_finished_vms", finishedVms)
+ record.put("total_submitted_vms", -1)
+ record.put("total_queued_vms", -1)
+ record.put("total_running_vms", -1)
+ record.put("total_finished_vms", -1)
queue.put(record)
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
index a92278d8..6f220640 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresExperimentReporter.kt
@@ -24,15 +24,20 @@
package com.atlarge.opendc.experiments.sc20.reporter
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
-class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: PostgresHostMetricsWriter) : ExperimentReporter {
+class ExperimentPostgresReporter(
+ val scenario: Long,
+ val run: Int,
+ val hostWriter: PostgresHostMetricsWriter,
+ val provisionerWriter: PostgresProvisionerMetricsWriter
+) : ExperimentReporter {
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
override fun reportVmStateChange(time: Long, server: Server) {}
@@ -40,11 +45,7 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P
override fun reportHostStateChange(
time: Long,
driver: VirtDriver,
- server: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long
+ server: Server
) {
val lastServerState = lastServerStates[server]
logger.debug("Host ${server.uid} changed state ${server.state} [$time]")
@@ -61,10 +62,6 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P
0.0,
0,
server,
- submittedVms,
- queuedVms,
- runningVms,
- finishedVms,
duration
)
@@ -93,13 +90,9 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P
cpuDemand: Double,
numberOfDeployedImages: Int,
hostServer: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long,
duration: Long
) {
- writer.write(
+ hostWriter.write(
scenario, run, HostMetrics(
time,
duration,
@@ -116,5 +109,22 @@ class ExperimentPostgresReporter(val scenario: Long, val run: Int, val writer: P
)
}
+ override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
+ provisionerWriter.write(
+ scenario,
+ run,
+ ProvisionerMetrics(
+ time,
+ event.totalHostCount,
+ event.availableHostCount,
+ event.totalVmCount,
+ event.activeVmCount,
+ event.inactiveVmCount,
+ event.waitingVmCount,
+ event.failedVmCount
+ )
+ )
+ }
+
override fun close() {}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
index 55b80e4c..5eb55f20 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresHostMetricsWriter.kt
@@ -54,4 +54,6 @@ public class PostgresHostMetricsWriter(ds: DataSource, batchSize: Int) :
stmt.setDouble(13, action.metrics.cpuDemand)
stmt.setDouble(14, action.metrics.powerDraw)
}
+
+ override fun toString(): String = "host-writer"
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
index 367a3a5a..33c2d40e 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresMetricsWriter.kt
@@ -48,7 +48,7 @@ public abstract class PostgresMetricsWriter<T>(
/**
* The thread for the actual writer.
*/
- private val writerThread: Thread = thread(name = "host-metrics-writer") { run() }
+ private val writerThread: Thread = thread(name = "metrics-writer") { run() }
/**
* Write the specified metrics to the database.
@@ -79,6 +79,7 @@ public abstract class PostgresMetricsWriter<T>(
* Start the writer thread.
*/
override fun run() {
+ writerThread.name = toString()
val conn = ds.connection
var batch = 0
@@ -114,7 +115,9 @@ public abstract class PostgresMetricsWriter<T>(
}
}
}
+
} finally {
+ stmt.executeBatch()
conn.commit()
conn.close()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt
new file mode 100644
index 00000000..7bc88959
--- /dev/null
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/PostgresProvisionerMetricsWriter.kt
@@ -0,0 +1,55 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.experiments.sc20.reporter
+
+import java.sql.Connection
+import java.sql.PreparedStatement
+import java.sql.Timestamp
+import javax.sql.DataSource
+
+/**
+ * A [PostgresMetricsWriter] for persisting [ProvisionerMetrics].
+ */
+public class PostgresProvisionerMetricsWriter(ds: DataSource, batchSize: Int) :
+ PostgresMetricsWriter<ProvisionerMetrics>(ds, batchSize) {
+ override fun createStatement(conn: Connection): PreparedStatement {
+ return conn.prepareStatement("INSERT INTO provisioner_metrics (scenario_id, run_id, timestamp, host_total_count, host_available_count, vm_total_count, vm_active_count, vm_inactive_count, vm_waiting_count, vm_failed_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
+ }
+
+ override fun persist(action: Action.Write<ProvisionerMetrics>, stmt: PreparedStatement) {
+ stmt.setLong(1, action.scenario)
+ stmt.setInt(2, action.run)
+ stmt.setTimestamp(3, Timestamp(action.metrics.time))
+ stmt.setInt(4, action.metrics.totalHostCount)
+ stmt.setInt(5, action.metrics.availableHostCount)
+ stmt.setInt(6, action.metrics.totalVmCount)
+ stmt.setInt(7, action.metrics.activeVmCount)
+ stmt.setInt(8, action.metrics.inactiveVmCount)
+ stmt.setInt(9, action.metrics.waitingVmCount)
+ stmt.setInt(10, action.metrics.failedVmCount)
+ }
+
+ override fun toString(): String = "provisioner-writer"
+}
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 5177c04a..1edb7bc2 100644
--- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -158,7 +158,7 @@ class Sc20IntegrationTest {
var totalOvercommissionedBurst = 0L
var totalInterferedBurst = 0L
- override suspend fun reportHostSlice(
+ override fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
@@ -168,10 +168,6 @@ class Sc20IntegrationTest {
cpuDemand: Double,
numberOfDeployedImages: Int,
hostServer: Server,
- submittedVms: Long,
- queuedVms: Long,
- runningVms: Long,
- finishedVms: Long,
duration: Long
) {
totalRequestedBurst += requestedBurst