diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-09-23 12:27:09 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-09-23 12:27:09 +0200 |
| commit | 3d5eb562227dcad5a8a60f31b96e6d68f7774fb2 (patch) | |
| tree | e99c4bf7e5647341c1e269797f7f46099753436f | |
| parent | d97356cf696dedb6c26fc42d9d7c44a977264dcd (diff) | |
refactor(compute): Provide access to instances in compute service
This change updates the interface of `ComputeService` to provide access
to the instances (servers) that have been registered with the compute
service. This allows metric collectors to query the metrics of the
servers that are currently running.
7 files changed, 49 insertions, 54 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index c0b70268..28ef7c40 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -37,6 +37,11 @@ import kotlin.coroutines.CoroutineContext */ public interface ComputeService : AutoCloseable { /** + * The servers that are registered with the "compute" service. + */ + public val servers: List<Server> + + /** * The hosts that are registered with the "compute" service. */ public val hosts: Set<Host> diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 52ee780b..519cf6c6 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -91,17 +91,18 @@ internal class ComputeServiceImpl( /** * The registered flavors for this compute service. */ - internal val flavors = mutableMapOf<UUID, InternalFlavor>() + internal val flavorById = mutableMapOf<UUID, InternalFlavor>() /** * The registered images for this compute service. */ - internal val images = mutableMapOf<UUID, InternalImage>() + internal val imageById = mutableMapOf<UUID, InternalImage>() /** * The registered servers for this compute service. */ - private val servers = mutableMapOf<UUID, InternalServer>() + private val serverById = mutableMapOf<UUID, InternalServer>() + override val servers: MutableList<Server> = mutableListOf() private var maxCores = 0 private var maxMemory = 0L @@ -127,13 +128,13 @@ internal class ComputeServiceImpl( override suspend fun queryFlavors(): List<Flavor> { check(!isClosed) { "Client is already closed" } - return flavors.values.map { ClientFlavor(it) } + return flavorById.values.map { ClientFlavor(it) } } override suspend fun findFlavor(id: UUID): Flavor? { check(!isClosed) { "Client is already closed" } - return flavors[id]?.let { ClientFlavor(it) } + return flavorById[id]?.let { ClientFlavor(it) } } override suspend fun newFlavor( @@ -156,7 +157,7 @@ internal class ComputeServiceImpl( meta ) - flavors[uid] = flavor + flavorById[uid] = flavor return ClientFlavor(flavor) } @@ -164,13 +165,13 @@ internal class ComputeServiceImpl( override suspend fun queryImages(): List<Image> { check(!isClosed) { "Client is already closed" } - return images.values.map { ClientImage(it) } + return imageById.values.map { ClientImage(it) } } override suspend fun findImage(id: UUID): Image? { check(!isClosed) { "Client is already closed" } - return images[id]?.let { ClientImage(it) } + return imageById[id]?.let { ClientImage(it) } } override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image { @@ -179,7 +180,7 @@ internal class ComputeServiceImpl( val uid = UUID(clock.millis(), random.nextLong()) val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) - images[uid] = image + imageById[uid] = image return ClientImage(image) } @@ -199,13 +200,14 @@ internal class ComputeServiceImpl( this@ComputeServiceImpl, uid, name, - requireNotNull(flavors[flavor.uid]) { "Unknown flavor" }, - requireNotNull(images[image.uid]) { "Unknown image" }, + requireNotNull(flavorById[flavor.uid]) { "Unknown flavor" }, + requireNotNull(imageById[image.uid]) { "Unknown image" }, labels.toMutableMap(), meta.toMutableMap() ) - servers[uid] = server + serverById[uid] = server + servers.add(server) if (start) { server.start() @@ -217,13 +219,13 @@ internal class ComputeServiceImpl( override suspend fun findServer(id: UUID): Server? { check(!isClosed) { "Client is already closed" } - return servers[id]?.let { ClientServer(it) } + return serverById[id]?.let { ClientServer(it) } } override suspend fun queryServers(): List<Server> { check(!isClosed) { "Client is already closed" } - return servers.values.map { ClientServer(it) } + return serverById.values.map { ClientServer(it) } } override fun close() { @@ -263,7 +265,11 @@ internal class ComputeServiceImpl( } override fun lookupHost(server: Server): Host? { - val internal = requireNotNull(servers[server.uid]) { "Invalid server passed to lookupHost" } + if (server is InternalServer) { + return server.host + } + + val internal = requireNotNull(serverById[server.uid]) { "Invalid server passed to lookupHost" } return internal.host } @@ -296,15 +302,16 @@ internal class ComputeServiceImpl( } internal fun delete(flavor: InternalFlavor) { - flavors.remove(flavor.uid) + flavorById.remove(flavor.uid) } internal fun delete(image: InternalImage) { - images.remove(image.uid) + imageById.remove(image.uid) } internal fun delete(server: InternalServer) { - servers.remove(server.uid) + serverById.remove(server.uid) + servers.remove(server) } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 4c07b785..f6744123 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -26,7 +26,6 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.yield -import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost @@ -59,7 +58,7 @@ public class ComputeServiceHelper( /** * The [ComputeService] that has been configured by the manager. */ - public val service: ComputeService + public val service: ComputeService = ComputeService(context, clock, scheduler, schedulingQuantum) /** * The [FlowEngine] to simulate the hosts. @@ -76,29 +75,23 @@ public class ComputeServiceHelper( */ private val random = SplittableRandom(seed) - init { - val service = createService(scheduler, schedulingQuantum) - this.service = service - } - /** * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. * * @param trace The trace to simulate. - * @param servers A list to which the created servers is added. * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). * @param failureModel A failure model to use for injecting failures. * @param interference A flag to indicate that VM interference needs to be enabled. */ public suspend fun run( trace: List<VirtualMachine>, - servers: MutableList<Server>? = null, submitImmediately: Boolean = false, failureModel: FailureModel? = null, interference: Boolean = false, ) { val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong())) val client = service.newClient() + val clock = clock // Create new image for the virtual machine val image = client.newImage("vm-image") @@ -147,8 +140,6 @@ public class ComputeServiceHelper( meta = meta ) - servers?.add(server) - // Wait for the server reach its end time val endTime = entry.stopTime.toEpochMilli() delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt index 45bd9ab1..a0ec4bd6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt @@ -42,7 +42,6 @@ import java.time.Instant * @param scope The [CoroutineScope] to run the reader in. * @param clock The virtual clock. * @param service The [ComputeService] to monitor. - * @param servers The [Server]s to monitor. * @param monitor The monitor to export the metrics to. * @param exportInterval The export interval. */ @@ -50,7 +49,6 @@ public class ComputeMetricReader( scope: CoroutineScope, clock: Clock, private val service: ComputeService, - private val servers: List<Server>, private val monitor: ComputeMonitor, private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { @@ -76,6 +74,11 @@ public class ComputeMetricReader( */ private val job = scope.launch { val intervalMs = exportInterval.toMillis() + val service = service + val monitor = monitor + val hostTableReaders = hostTableReaders + val serverTableReaders = serverTableReaders + val serviceTableReader = serviceTableReader try { while (isActive) { @@ -91,7 +94,7 @@ public class ComputeMetricReader( reader.reset() } - for (server in servers) { + for (server in service.servers) { val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } reader.record(now) monitor.record(reader) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index 98702b4c..dbb5ced3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import org.opendc.compute.api.Server import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler @@ -78,7 +77,6 @@ public class CapelinRunner( val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) - val servers = mutableListOf<Server>() val partitions = scenario.partitions + ("seed" to seed.toString()) val partition = partitions.map { (k, v) -> "$k=$v" }.joinToString("/") val exporter = if (outputPath != null) { @@ -86,7 +84,6 @@ public class CapelinRunner( this, clock, runner.service, - servers, ParquetComputeMonitor( outputPath, partition, @@ -103,7 +100,7 @@ public class CapelinRunner( runner.apply(topology, optimize = true) // Run the workload trace - runner.run(vms, servers, failureModel = failureModel, interference = operationalPhenomena.hasInterference) + runner.run(vms, failureModel = failureModel, interference = operationalPhenomena.hasInterference) // Stop the metric collection exporter?.close() diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index bf8c2758..eae3c993 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -26,7 +26,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.Server import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter @@ -90,13 +89,11 @@ class CapelinIntegrationTest { seed, ) val topology = createTopology() - - val servers = mutableListOf<Server>() - val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) + val reader = ComputeMetricReader(this, clock, runner.service, monitor) try { runner.apply(topology) - runner.run(workload, servers) + runner.run(workload) val serviceMetrics = runner.service.getSchedulerStats() println( @@ -140,12 +137,11 @@ class CapelinIntegrationTest { seed, ) val topology = createTopology("single") - val servers = mutableListOf<Server>() - val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) + val reader = ComputeMetricReader(this, clock, runner.service, monitor) try { runner.apply(topology) - runner.run(workload, servers) + runner.run(workload) val serviceMetrics = runner.service.getSchedulerStats() println( @@ -186,12 +182,11 @@ class CapelinIntegrationTest { seed ) val topology = createTopology("single") - val servers = mutableListOf<Server>() - val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) + val reader = ComputeMetricReader(this, clock, simulator.service, monitor) try { simulator.apply(topology) - simulator.run(workload, servers, interference = true) + simulator.run(workload, interference = true) val serviceMetrics = simulator.service.getSchedulerStats() println( @@ -230,12 +225,11 @@ class CapelinIntegrationTest { ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) - val servers = mutableListOf<Server>() - val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) + val reader = ComputeMetricReader(this, clock, simulator.service, monitor) try { simulator.apply(topology) - simulator.run(workload, servers, failureModel = grid5000(Duration.ofDays(7))) + simulator.run(workload, failureModel = grid5000(Duration.ofDays(7))) val serviceMetrics = simulator.service.getSchedulerStats() println( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 570920f3..9a1319b6 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -23,7 +23,6 @@ package org.opendc.web.runner import mu.KotlinLogging -import org.opendc.compute.api.Server import org.opendc.compute.workload.* import org.opendc.compute.workload.telemetry.ComputeMetricReader import org.opendc.compute.workload.topology.HostSpec @@ -222,14 +221,13 @@ public class OpenDCRunner( computeScheduler, seed = 0L, ) - val servers = mutableListOf<Server>() - val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) + val reader = ComputeMetricReader(this, clock, simulator.service, monitor) try { // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(vms, servers, failureModel = failureModel, interference = phenomena.interference) + simulator.run(vms, failureModel = failureModel, interference = phenomena.interference) val serviceMetrics = simulator.service.getSchedulerStats() logger.debug { |
