summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-07 15:44:36 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-07 16:13:16 +0100
commit9bb91897404bbeac1d5f7a7f890abd3a9d5d9084 (patch)
tree9e492df044306b6230444b73f2c3db59ef45440e /simulator/opendc-experiments/opendc-experiments-capelin/src
parentdfbca195cbe1d6c4eebe7ccd4cc707c84ac43e79 (diff)
compute: Move ComputeService implementation in service module
This change introduces the ComputeService interface (previously VirtProvisioningService) and provides a central implementation in opendc-compute-service. Previously, the implementation of this interface was bound to the simulator package, which meant that independent business logic could not be re-used without importing the simulator code.
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt43
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt11
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt6
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt6
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt33
5 files changed, 60 insertions, 39 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 2be3fe99..eb819b58 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -39,13 +39,15 @@ import org.opendc.compute.api.ServerWatcher
import org.opendc.compute.core.metal.NODE_CLUSTER
import org.opendc.compute.core.metal.NodeEvent
import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.virt.HostEvent
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.HostEvent
+import org.opendc.compute.service.internal.ComputeServiceImpl
+import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.AllocationPolicy
+import org.opendc.compute.simulator.SimHostProvisioner
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
@@ -137,6 +139,12 @@ public fun createTraceReader(
)
}
+public data class ProvisionerResult(
+ val metal: ProvisioningService,
+ val provisioner: SimHostProvisioner,
+ val compute: ComputeServiceImpl
+)
+
/**
* Construct the environment for a VM provisioner and return the provisioner instance.
*/
@@ -146,33 +154,40 @@ public suspend fun createProvisioner(
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
eventTracer: EventTracer
-): Pair<ProvisioningService, SimVirtProvisioningService> {
+): ProvisionerResult {
val environment = environmentReader.use { it.construct(coroutineScope, clock) }
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider())
+ val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider())
+ val hosts = provisioner.provisionAll()
+
+ val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl
+
+ for (host in hosts) {
+ scheduler.addHost(host)
+ }
// Wait for the hypervisors to be spawned
delay(10)
- return bareMetalProvisioner to scheduler
+ return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler)
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public suspend fun attachMonitor(
+public fun attachMonitor(
coroutineScope: CoroutineScope,
clock: Clock,
- scheduler: SimVirtProvisioningService,
+ scheduler: ComputeService,
monitor: ExperimentMonitor
) {
- val hypervisors = scheduler.drivers()
+ val hypervisors = scheduler.hosts
// Monitor hypervisor events
for (hypervisor in hypervisors) {
@@ -201,7 +216,7 @@ public suspend fun attachMonitor(
event.cpuUsage,
event.cpuDemand,
event.numberOfDeployedImages,
- event.host
+ (event.driver as SimHost).node
)
}
}
@@ -216,7 +231,7 @@ public suspend fun attachMonitor(
scheduler.events
.onEach { event ->
when (event) {
- is VirtProvisioningEvent.MetricsAvailable ->
+ is ComputeServiceEvent.MetricsAvailable ->
monitor.reportProvisionerMetrics(clock.millis(), event)
}
}
@@ -230,7 +245,7 @@ public suspend fun processTrace(
coroutineScope: CoroutineScope,
clock: Clock,
reader: TraceReader<VmWorkload>,
- scheduler: SimVirtProvisioningService,
+ scheduler: ComputeService,
chan: Channel<Unit>,
monitor: ExperimentMonitor
) {
@@ -265,7 +280,7 @@ public suspend fun processTrace(
scheduler.events
.takeWhile {
when (it) {
- is VirtProvisioningEvent.MetricsAvailable ->
+ is ComputeServiceEvent.MetricsAvailable ->
it.inactiveVmCount + it.failedVmCount != submitted
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 75b0d735..ff0a026d 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -28,6 +28,12 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
+import org.opendc.compute.service.scheduler.AllocationPolicy
+import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
+import org.opendc.compute.service.scheduler.RandomAllocationPolicy
import org.opendc.compute.simulator.allocation.*
import org.opendc.experiments.capelin.experiment.attachMonitor
import org.opendc.experiments.capelin.experiment.createFailureDomain
@@ -151,7 +157,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
)
testScope.launch {
- val (bareMetalProvisioner, scheduler) = createProvisioner(
+ val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner(
this,
clock,
environment,
@@ -190,7 +196,8 @@ public abstract class Portfolio(name: String) : Experiment(name) {
logger.debug("FINISHED=${scheduler.finishedVms}")
failureDomain?.cancel()
- scheduler.terminate()
+ scheduler.close()
+ provisioner.close()
}
try {
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 9e4adcc5..6039289f 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -25,8 +25,8 @@ package org.opendc.experiments.capelin.monitor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.virt.Host
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.Host
import java.io.Closeable
/**
@@ -73,5 +73,5 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked for a provisioner event.
*/
- public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {}
+ public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index 0912c8ae..b879399c 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -26,8 +26,8 @@ import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.virt.Host
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.Host
import org.opendc.experiments.capelin.telemetry.HostEvent
import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
@@ -176,7 +176,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
}
}
- override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
provisionerWriter.write(
ProvisionerEvent(
time,
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index fca523cd..73525ae2 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -34,8 +34,8 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy
+import org.opendc.compute.service.internal.ComputeServiceImpl
+import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
import org.opendc.experiments.capelin.experiment.attachMonitor
import org.opendc.experiments.capelin.experiment.createFailureDomain
import org.opendc.experiments.capelin.experiment.createProvisioner
@@ -97,7 +97,7 @@ class CapelinIntegrationTest {
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var scheduler: SimVirtProvisioningService
+ lateinit var scheduler: ComputeServiceImpl
val tracer = EventTracer(clock)
testScope.launch {
@@ -108,8 +108,8 @@ class CapelinIntegrationTest {
allocationPolicy,
tracer
)
- val bareMetalProvisioner = res.first
- scheduler = res.second
+ val bareMetalProvisioner = res.metal
+ scheduler = res.compute
val failureDomain = if (failures) {
println("ENABLING failures")
@@ -138,8 +138,9 @@ class CapelinIntegrationTest {
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
failureDomain?.cancel()
- scheduler.terminate()
+ scheduler.close()
monitor.close()
+ res.provisioner.close()
}
runSimulation()
@@ -148,9 +149,9 @@ class CapelinIntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(1679510908774, monitor.totalRequestedBurst) },
- { assertEquals(384100282091, monitor.totalGrantedBurst) },
- { assertEquals(1282152242721, monitor.totalOvercommissionedBurst) },
+ { assertEquals(1678587333640, monitor.totalRequestedBurst) },
+ { assertEquals(438118200924, monitor.totalGrantedBurst) },
+ { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
}
@@ -162,19 +163,16 @@ class CapelinIntegrationTest {
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
- lateinit var scheduler: SimVirtProvisioningService
val tracer = EventTracer(clock)
testScope.launch {
- val res = createProvisioner(
+ val (_, provisioner, scheduler) = createProvisioner(
this,
clock,
environmentReader,
allocationPolicy,
tracer
)
- scheduler = res.second
-
attachMonitor(this, clock, scheduler, monitor)
processTrace(
this,
@@ -187,17 +185,18 @@ class CapelinIntegrationTest {
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
- scheduler.terminate()
+ scheduler.close()
monitor.close()
+ provisioner.close()
}
runSimulation()
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(710487768664, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(118846235815, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(584211294239, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(705128393965, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}