summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt24
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt)12
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt)17
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt8
4 files changed, 35 insertions, 26 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
index b1964197..c74189c2 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Experiment.kt
@@ -184,19 +184,19 @@ suspend fun createProvisioner(
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc20Monitor) {
+suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, reporter: Sc20Reporter) {
val domain = simulationContext.domain
val hypervisors = scheduler.drivers()
// Monitor hypervisor events
for (hypervisor in hypervisors) {
// TODO Do not expose VirtDriver directly but use Hypervisor class.
- monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ reporter.reportHostStateChange(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
hypervisor.server.events
.onEach { event ->
when (event) {
is ServerEvent.StateChanged -> {
- monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
+ reporter.reportHostStateChange(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms)
}
}
}
@@ -204,7 +204,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2
hypervisor.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(
+ is HypervisorEvent.SliceFinished -> reporter.reportHostSlice(
simulationContext.clock.millis(),
event.requestedBurst,
event.grantedBurst,
@@ -228,7 +228,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Sc2
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: Sc20Monitor, vmPlacements: Map<String, String> = emptyMap()) {
+suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, reporter: Sc20Reporter, vmPlacements: Map<String, String> = emptyMap()) {
val domain = simulationContext.domain
try {
@@ -266,7 +266,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
server.events
.onEach {
if (it is ServerEvent.StateChanged) {
- monitor.onVmStateChanged(it.server)
+ reporter.reportVmStateChange(it.server)
}
delay(1)
@@ -299,7 +299,7 @@ fun main(args: Array<String>) {
println("allocation-policy: ${cli.allocationPolicy}")
val start = System.currentTimeMillis()
- val monitor: Sc20Monitor = Sc20ParquetMonitor(cli.outputFile)
+ val reporter: Sc20Reporter = Sc20ParquetReporter(cli.outputFile)
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider("test")
@@ -316,7 +316,7 @@ fun main(args: Array<String>) {
Sc20PerformanceInterferenceReader(performanceInterferenceStream)
.construct()
} catch (e: Throwable) {
- monitor.close()
+ reporter.close()
throw e
}
val vmPlacements = if (cli.vmPlacementFile == null) {
@@ -328,7 +328,7 @@ fun main(args: Array<String>) {
val traceReader = try {
createTraceReader(File(cli.traceDirectory), performanceInterferenceModel, cli.getSelectedVmList(), cli.seed)
} catch (e: Throwable) {
- monitor.close()
+ reporter.close()
throw e
}
val allocationPolicy = when (cli.allocationPolicy) {
@@ -355,8 +355,8 @@ fun main(args: Array<String>) {
null
}
- attachMonitor(scheduler, monitor)
- processTrace(traceReader, scheduler, chan, monitor, vmPlacements)
+ attachMonitor(scheduler, reporter)
+ processTrace(traceReader, scheduler, chan, reporter, vmPlacements)
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
@@ -371,5 +371,5 @@ fun main(args: Array<String>) {
}
// Explicitly close the monitor to flush its buffer
- monitor.close()
+ reporter.close()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
index 5e554196..2b653b69 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetMonitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetReporter.kt
@@ -14,9 +14,9 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import java.util.concurrent.ArrayBlockingQueue
import kotlin.concurrent.thread
-class Sc20ParquetMonitor(
+class Sc20ParquetReporter(
destination: String
-) : Sc20Monitor {
+) : Sc20Reporter {
private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>()
private val schema = SchemaBuilder
.record("slice")
@@ -60,9 +60,9 @@ class Sc20ParquetMonitor(
}
}
- override suspend fun onVmStateChanged(server: Server) {}
+ override suspend fun reportVmStateChange(server: Server) {}
- override suspend fun serverStateChanged(
+ override suspend fun reportHostStateChange(
driver: VirtDriver,
server: Server,
submittedVms: Long,
@@ -73,7 +73,7 @@ class Sc20ParquetMonitor(
val lastServerState = lastServerStates[server]
if (server.state == ServerState.SHUTOFF && lastServerState != null) {
val duration = simulationContext.clock.millis() - lastServerState.second
- onSliceFinish(
+ reportHostSlice(
simulationContext.clock.millis(),
0,
0,
@@ -96,7 +96,7 @@ class Sc20ParquetMonitor(
lastServerStates[server] = Pair(server.state, simulationContext.clock.millis())
}
- override suspend fun onSliceFinish(
+ override suspend fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt
index 4b8b80a8..84500417 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Reporter.kt
@@ -28,10 +28,16 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import java.io.Closeable
-interface Sc20Monitor : Closeable {
- suspend fun onVmStateChanged(server: Server) {}
+interface Sc20Reporter : Closeable {
+ /**
+ * This method is invoked when the state of a VM changes.
+ */
+ suspend fun reportVmStateChange(server: Server) {}
- suspend fun serverStateChanged(
+ /**
+ * This method is invoked when the state of a host changes.
+ */
+ suspend fun reportHostStateChange(
driver: VirtDriver,
server: Server,
submittedVms: Long,
@@ -40,7 +46,10 @@ interface Sc20Monitor : Closeable {
finishedVms: Long
) {}
- suspend fun onSliceFinish(
+ /**
+ * This method is invoked for a host for each slice that is finishes.
+ */
+ suspend fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 2bf6bcf3..239d018a 100644
--- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -63,7 +63,7 @@ class Sc20IntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var monitor: TestSc20Monitor
+ private lateinit var monitor: TestSc20Reporter
/**
* Setup the experimental environment.
@@ -73,7 +73,7 @@ class Sc20IntegrationTest {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
simulationEngine = provider("test")
root = simulationEngine.newDomain("root")
- monitor = TestSc20Monitor()
+ monitor = TestSc20Reporter()
}
/**
@@ -151,13 +151,13 @@ class Sc20IntegrationTest {
return Sc20ClusterEnvironmentReader(stream)
}
- class TestSc20Monitor : Sc20Monitor {
+ class TestSc20Reporter : Sc20Reporter {
var totalRequestedBurst = 0L
var totalGrantedBurst = 0L
var totalOvercommissionedBurst = 0L
var totalInterferedBurst = 0L
- override suspend fun onSliceFinish(
+ override suspend fun reportHostSlice(
time: Long,
requestedBurst: Long,
grantedBurst: Long,