diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-07 15:44:36 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-07 16:13:16 +0100 |
| commit | 9bb91897404bbeac1d5f7a7f890abd3a9d5d9084 (patch) | |
| tree | 9e492df044306b6230444b73f2c3db59ef45440e /simulator/opendc-experiments/opendc-experiments-capelin/src | |
| parent | dfbca195cbe1d6c4eebe7ccd4cc707c84ac43e79 (diff) | |
compute: Move ComputeService implementation in service module
This change introduces the ComputeService interface (previously
VirtProvisioningService) and provides a central implementation in
opendc-compute-service.
Previously, the implementation of this interface was bound to the
simulator package, which meant that independent business logic could not
be re-used without importing the simulator code.
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src')
5 files changed, 60 insertions, 39 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 2be3fe99..eb819b58 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -39,13 +39,15 @@ import org.opendc.compute.api.ServerWatcher import org.opendc.compute.core.metal.NODE_CLUSTER import org.opendc.compute.core.metal.NodeEvent import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HostEvent -import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AllocationPolicy +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader @@ -137,6 +139,12 @@ public fun createTraceReader( ) } +public data class ProvisionerResult( + val metal: ProvisioningService, + val provisioner: SimHostProvisioner, + val compute: ComputeServiceImpl +) + /** * Construct the environment for a VM provisioner and return the provisioner instance. */ @@ -146,33 +154,40 @@ public suspend fun createProvisioner( environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, eventTracer: EventTracer -): Pair<ProvisioningService, SimVirtProvisioningService> { +): ProvisionerResult { val environment = environmentReader.use { it.construct(coroutineScope, clock) } val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider()) + val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider()) + val hosts = provisioner.provisionAll() + + val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + + for (host in hosts) { + scheduler.addHost(host) + } // Wait for the hypervisors to be spawned delay(10) - return bareMetalProvisioner to scheduler + return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler) } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public suspend fun attachMonitor( +public fun attachMonitor( coroutineScope: CoroutineScope, clock: Clock, - scheduler: SimVirtProvisioningService, + scheduler: ComputeService, monitor: ExperimentMonitor ) { - val hypervisors = scheduler.drivers() + val hypervisors = scheduler.hosts // Monitor hypervisor events for (hypervisor in hypervisors) { @@ -201,7 +216,7 @@ public suspend fun attachMonitor( event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.host + (event.driver as SimHost).node ) } } @@ -216,7 +231,7 @@ public suspend fun attachMonitor( scheduler.events .onEach { event -> when (event) { - is VirtProvisioningEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> monitor.reportProvisionerMetrics(clock.millis(), event) } } @@ -230,7 +245,7 @@ public suspend fun processTrace( coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, - scheduler: SimVirtProvisioningService, + scheduler: ComputeService, chan: Channel<Unit>, monitor: ExperimentMonitor ) { @@ -265,7 +280,7 @@ public suspend fun processTrace( scheduler.events .takeWhile { when (it) { - is VirtProvisioningEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> it.inactiveVmCount + it.failedVmCount != submitted } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 75b0d735..ff0a026d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -28,6 +28,12 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging +import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy +import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain @@ -151,7 +157,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { ) testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( + val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( this, clock, environment, @@ -190,7 +196,8 @@ public abstract class Portfolio(name: String) : Experiment(name) { logger.debug("FINISHED=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() + provisioner.close() } try { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 9e4adcc5..6039289f 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -25,8 +25,8 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.Host -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import java.io.Closeable /** @@ -73,5 +73,5 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked for a provisioner event. */ - public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} + public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index 0912c8ae..b879399c 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -26,8 +26,8 @@ import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.Host -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import org.opendc.experiments.capelin.telemetry.HostEvent import org.opendc.experiments.capelin.telemetry.ProvisionerEvent import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter @@ -176,7 +176,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } } - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { provisionerWriter.write( ProvisionerEvent( time, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index fca523cd..73525ae2 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,8 +34,8 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.core.metal.Node import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain import org.opendc.experiments.capelin.experiment.createProvisioner @@ -97,7 +97,7 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: SimVirtProvisioningService + lateinit var scheduler: ComputeServiceImpl val tracer = EventTracer(clock) testScope.launch { @@ -108,8 +108,8 @@ class CapelinIntegrationTest { allocationPolicy, tracer ) - val bareMetalProvisioner = res.first - scheduler = res.second + val bareMetalProvisioner = res.metal + scheduler = res.compute val failureDomain = if (failures) { println("ENABLING failures") @@ -138,8 +138,9 @@ class CapelinIntegrationTest { println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() monitor.close() + res.provisioner.close() } runSimulation() @@ -148,9 +149,9 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1679510908774, monitor.totalRequestedBurst) }, - { assertEquals(384100282091, monitor.totalGrantedBurst) }, - { assertEquals(1282152242721, monitor.totalOvercommissionedBurst) }, + { assertEquals(1678587333640, monitor.totalRequestedBurst) }, + { assertEquals(438118200924, monitor.totalGrantedBurst) }, + { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @@ -162,19 +163,16 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - lateinit var scheduler: SimVirtProvisioningService val tracer = EventTracer(clock) testScope.launch { - val res = createProvisioner( + val (_, provisioner, scheduler) = createProvisioner( this, clock, environmentReader, allocationPolicy, tracer ) - scheduler = res.second - attachMonitor(this, clock, scheduler, monitor) processTrace( this, @@ -187,17 +185,18 @@ class CapelinIntegrationTest { println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - scheduler.terminate() + scheduler.close() monitor.close() + provisioner.close() } runSimulation() // Note that these values have been verified beforehand assertAll( - { assertEquals(710487768664, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(118846235815, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(584211294239, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(705128393965, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } |
