summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-23 12:27:09 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-23 12:27:09 +0200
commit3d5eb562227dcad5a8a60f31b96e6d68f7774fb2 (patch)
treee99c4bf7e5647341c1e269797f7f46099753436f
parentd97356cf696dedb6c26fc42d9d7c44a977264dcd (diff)
refactor(compute): Provide access to instances in compute service
This change updates the interface of `ComputeService` to provide access to the instances (servers) that have been registered with the compute service. This allows metric collectors to query the metrics of the servers that are currently running.
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt5
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt43
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt13
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt9
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt5
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt22
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt6
7 files changed, 49 insertions, 54 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index c0b70268..28ef7c40 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -37,6 +37,11 @@ import kotlin.coroutines.CoroutineContext
*/
public interface ComputeService : AutoCloseable {
/**
+ * The servers that are registered with the "compute" service.
+ */
+ public val servers: List<Server>
+
+ /**
* The hosts that are registered with the "compute" service.
*/
public val hosts: Set<Host>
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 52ee780b..519cf6c6 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -91,17 +91,18 @@ internal class ComputeServiceImpl(
/**
* The registered flavors for this compute service.
*/
- internal val flavors = mutableMapOf<UUID, InternalFlavor>()
+ internal val flavorById = mutableMapOf<UUID, InternalFlavor>()
/**
* The registered images for this compute service.
*/
- internal val images = mutableMapOf<UUID, InternalImage>()
+ internal val imageById = mutableMapOf<UUID, InternalImage>()
/**
* The registered servers for this compute service.
*/
- private val servers = mutableMapOf<UUID, InternalServer>()
+ private val serverById = mutableMapOf<UUID, InternalServer>()
+ override val servers: MutableList<Server> = mutableListOf()
private var maxCores = 0
private var maxMemory = 0L
@@ -127,13 +128,13 @@ internal class ComputeServiceImpl(
override suspend fun queryFlavors(): List<Flavor> {
check(!isClosed) { "Client is already closed" }
- return flavors.values.map { ClientFlavor(it) }
+ return flavorById.values.map { ClientFlavor(it) }
}
override suspend fun findFlavor(id: UUID): Flavor? {
check(!isClosed) { "Client is already closed" }
- return flavors[id]?.let { ClientFlavor(it) }
+ return flavorById[id]?.let { ClientFlavor(it) }
}
override suspend fun newFlavor(
@@ -156,7 +157,7 @@ internal class ComputeServiceImpl(
meta
)
- flavors[uid] = flavor
+ flavorById[uid] = flavor
return ClientFlavor(flavor)
}
@@ -164,13 +165,13 @@ internal class ComputeServiceImpl(
override suspend fun queryImages(): List<Image> {
check(!isClosed) { "Client is already closed" }
- return images.values.map { ClientImage(it) }
+ return imageById.values.map { ClientImage(it) }
}
override suspend fun findImage(id: UUID): Image? {
check(!isClosed) { "Client is already closed" }
- return images[id]?.let { ClientImage(it) }
+ return imageById[id]?.let { ClientImage(it) }
}
override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image {
@@ -179,7 +180,7 @@ internal class ComputeServiceImpl(
val uid = UUID(clock.millis(), random.nextLong())
val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta)
- images[uid] = image
+ imageById[uid] = image
return ClientImage(image)
}
@@ -199,13 +200,14 @@ internal class ComputeServiceImpl(
this@ComputeServiceImpl,
uid,
name,
- requireNotNull(flavors[flavor.uid]) { "Unknown flavor" },
- requireNotNull(images[image.uid]) { "Unknown image" },
+ requireNotNull(flavorById[flavor.uid]) { "Unknown flavor" },
+ requireNotNull(imageById[image.uid]) { "Unknown image" },
labels.toMutableMap(),
meta.toMutableMap()
)
- servers[uid] = server
+ serverById[uid] = server
+ servers.add(server)
if (start) {
server.start()
@@ -217,13 +219,13 @@ internal class ComputeServiceImpl(
override suspend fun findServer(id: UUID): Server? {
check(!isClosed) { "Client is already closed" }
- return servers[id]?.let { ClientServer(it) }
+ return serverById[id]?.let { ClientServer(it) }
}
override suspend fun queryServers(): List<Server> {
check(!isClosed) { "Client is already closed" }
- return servers.values.map { ClientServer(it) }
+ return serverById.values.map { ClientServer(it) }
}
override fun close() {
@@ -263,7 +265,11 @@ internal class ComputeServiceImpl(
}
override fun lookupHost(server: Server): Host? {
- val internal = requireNotNull(servers[server.uid]) { "Invalid server passed to lookupHost" }
+ if (server is InternalServer) {
+ return server.host
+ }
+
+ val internal = requireNotNull(serverById[server.uid]) { "Invalid server passed to lookupHost" }
return internal.host
}
@@ -296,15 +302,16 @@ internal class ComputeServiceImpl(
}
internal fun delete(flavor: InternalFlavor) {
- flavors.remove(flavor.uid)
+ flavorById.remove(flavor.uid)
}
internal fun delete(image: InternalImage) {
- images.remove(image.uid)
+ imageById.remove(image.uid)
}
internal fun delete(server: InternalServer) {
- servers.remove(server.uid)
+ serverById.remove(server.uid)
+ servers.remove(server)
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
index 4c07b785..f6744123 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
@@ -26,7 +26,6 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
-import org.opendc.compute.api.Server
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
@@ -59,7 +58,7 @@ public class ComputeServiceHelper(
/**
* The [ComputeService] that has been configured by the manager.
*/
- public val service: ComputeService
+ public val service: ComputeService = ComputeService(context, clock, scheduler, schedulingQuantum)
/**
* The [FlowEngine] to simulate the hosts.
@@ -76,29 +75,23 @@ public class ComputeServiceHelper(
*/
private val random = SplittableRandom(seed)
- init {
- val service = createService(scheduler, schedulingQuantum)
- this.service = service
- }
-
/**
* Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
*
* @param trace The trace to simulate.
- * @param servers A list to which the created servers is added.
* @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
* @param failureModel A failure model to use for injecting failures.
* @param interference A flag to indicate that VM interference needs to be enabled.
*/
public suspend fun run(
trace: List<VirtualMachine>,
- servers: MutableList<Server>? = null,
submitImmediately: Boolean = false,
failureModel: FailureModel? = null,
interference: Boolean = false,
) {
val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong()))
val client = service.newClient()
+ val clock = clock
// Create new image for the virtual machine
val image = client.newImage("vm-image")
@@ -147,8 +140,6 @@ public class ComputeServiceHelper(
meta = meta
)
- servers?.add(server)
-
// Wait for the server reach its end time
val endTime = entry.stopTime.toEpochMilli()
delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
index 45bd9ab1..a0ec4bd6 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
@@ -42,7 +42,6 @@ import java.time.Instant
* @param scope The [CoroutineScope] to run the reader in.
* @param clock The virtual clock.
* @param service The [ComputeService] to monitor.
- * @param servers The [Server]s to monitor.
* @param monitor The monitor to export the metrics to.
* @param exportInterval The export interval.
*/
@@ -50,7 +49,6 @@ public class ComputeMetricReader(
scope: CoroutineScope,
clock: Clock,
private val service: ComputeService,
- private val servers: List<Server>,
private val monitor: ComputeMonitor,
private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
@@ -76,6 +74,11 @@ public class ComputeMetricReader(
*/
private val job = scope.launch {
val intervalMs = exportInterval.toMillis()
+ val service = service
+ val monitor = monitor
+ val hostTableReaders = hostTableReaders
+ val serverTableReaders = serverTableReaders
+ val serviceTableReader = serviceTableReader
try {
while (isActive) {
@@ -91,7 +94,7 @@ public class ComputeMetricReader(
reader.reset()
}
- for (server in servers) {
+ for (server in service.servers) {
val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
reader.record(now)
monitor.record(reader)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
index 98702b4c..dbb5ced3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin
-import org.opendc.compute.api.Server
import org.opendc.compute.workload.ComputeServiceHelper
import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.createComputeScheduler
@@ -78,7 +77,6 @@ public class CapelinRunner(
val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt"))
- val servers = mutableListOf<Server>()
val partitions = scenario.partitions + ("seed" to seed.toString())
val partition = partitions.map { (k, v) -> "$k=$v" }.joinToString("/")
val exporter = if (outputPath != null) {
@@ -86,7 +84,6 @@ public class CapelinRunner(
this,
clock,
runner.service,
- servers,
ParquetComputeMonitor(
outputPath,
partition,
@@ -103,7 +100,7 @@ public class CapelinRunner(
runner.apply(topology, optimize = true)
// Run the workload trace
- runner.run(vms, servers, failureModel = failureModel, interference = operationalPhenomena.hasInterference)
+ runner.run(vms, failureModel = failureModel, interference = operationalPhenomena.hasInterference)
// Stop the metric collection
exporter?.close()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index bf8c2758..eae3c993 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -26,7 +26,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.api.Server
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
@@ -90,13 +89,11 @@ class CapelinIntegrationTest {
seed,
)
val topology = createTopology()
-
- val servers = mutableListOf<Server>()
- val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor)
+ val reader = ComputeMetricReader(this, clock, runner.service, monitor)
try {
runner.apply(topology)
- runner.run(workload, servers)
+ runner.run(workload)
val serviceMetrics = runner.service.getSchedulerStats()
println(
@@ -140,12 +137,11 @@ class CapelinIntegrationTest {
seed,
)
val topology = createTopology("single")
- val servers = mutableListOf<Server>()
- val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor)
+ val reader = ComputeMetricReader(this, clock, runner.service, monitor)
try {
runner.apply(topology)
- runner.run(workload, servers)
+ runner.run(workload)
val serviceMetrics = runner.service.getSchedulerStats()
println(
@@ -186,12 +182,11 @@ class CapelinIntegrationTest {
seed
)
val topology = createTopology("single")
- val servers = mutableListOf<Server>()
- val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
+ val reader = ComputeMetricReader(this, clock, simulator.service, monitor)
try {
simulator.apply(topology)
- simulator.run(workload, servers, interference = true)
+ simulator.run(workload, interference = true)
val serviceMetrics = simulator.service.getSchedulerStats()
println(
@@ -230,12 +225,11 @@ class CapelinIntegrationTest {
)
val topology = createTopology("single")
val workload = createTestWorkload(0.25, seed)
- val servers = mutableListOf<Server>()
- val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
+ val reader = ComputeMetricReader(this, clock, simulator.service, monitor)
try {
simulator.apply(topology)
- simulator.run(workload, servers, failureModel = grid5000(Duration.ofDays(7)))
+ simulator.run(workload, failureModel = grid5000(Duration.ofDays(7)))
val serviceMetrics = simulator.service.getSchedulerStats()
println(
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 570920f3..9a1319b6 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -23,7 +23,6 @@
package org.opendc.web.runner
import mu.KotlinLogging
-import org.opendc.compute.api.Server
import org.opendc.compute.workload.*
import org.opendc.compute.workload.telemetry.ComputeMetricReader
import org.opendc.compute.workload.topology.HostSpec
@@ -222,14 +221,13 @@ public class OpenDCRunner(
computeScheduler,
seed = 0L,
)
- val servers = mutableListOf<Server>()
- val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
+ val reader = ComputeMetricReader(this, clock, simulator.service, monitor)
try {
// Instantiate the topology onto the simulator
simulator.apply(topology)
// Run workload trace
- simulator.run(vms, servers, failureModel = failureModel, interference = phenomena.interference)
+ simulator.run(vms, failureModel = failureModel, interference = phenomena.interference)
val serviceMetrics = simulator.service.getSchedulerStats()
logger.debug {