summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-service/build.gradle.kts1
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt19
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt6
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt6
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt134
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt20
6 files changed, 160 insertions, 26 deletions
diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts
index e0e48b0f..33cafc45 100644
--- a/opendc-compute/opendc-compute-service/build.gradle.kts
+++ b/opendc-compute/opendc-compute-service/build.gradle.kts
@@ -35,6 +35,7 @@ dependencies {
api(projects.opendcTelemetry.opendcTelemetryApi)
implementation(projects.opendcUtils)
implementation(libs.kotlin.logging)
+ implementation(libs.opentelemetry.semconv)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testRuntimeOnly(libs.log4j.slf4j)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index d7a7e8f8..f1c055d4 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,7 +22,9 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -160,6 +162,15 @@ internal class ComputeServiceImpl(
.build()
/**
+ * The response time of the service.
+ */
+ private val _schedulerDuration = meter.histogramBuilder("scheduler.duration")
+ .setDescription("End to end latency for a server to be scheduled (in multiple attempts)")
+ .ofLongs()
+ .setUnit("ms")
+ .build()
+
+ /**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
*/
private var timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
@@ -325,7 +336,7 @@ internal class ComputeServiceImpl(
internal fun schedule(server: InternalServer): SchedulingRequest {
logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
- val request = SchedulingRequest(server)
+ val request = SchedulingRequest(server, clock.millis())
queue.add(request)
_submittedServers.add(1)
_waitingServers.add(1)
@@ -368,6 +379,7 @@ internal class ComputeServiceImpl(
* Run a single scheduling iteration.
*/
private fun doSchedule() {
+ val now = clock.millis()
while (queue.isNotEmpty()) {
val request = queue.peek()
@@ -390,7 +402,7 @@ internal class ComputeServiceImpl(
logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
- server.state = ServerState.ERROR
+ server.state = ServerState.TERMINATED
continue
} else {
break
@@ -402,6 +414,7 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
_waitingServers.add(-1)
+ _schedulerDuration.record(now - request.submitTime, Attributes.of(ResourceAttributes.HOST_ID, server.uid.toString()))
logger.info { "Assigned server $server to host $host." }
@@ -430,7 +443,7 @@ internal class ComputeServiceImpl(
/**
* A request to schedule an [InternalServer] onto one of the [Host]s.
*/
- internal data class SchedulingRequest(val server: InternalServer) {
+ internal data class SchedulingRequest(val server: InternalServer, val submitTime: Long) {
/**
* A flag to indicate that the request is cancelled.
*/
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index c6c01ea2..d036ec00 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -170,7 +170,7 @@ internal class ComputeServiceTest {
server.start()
delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
@@ -183,7 +183,7 @@ internal class ComputeServiceTest {
server.start()
delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
@@ -196,7 +196,7 @@ internal class ComputeServiceTest {
server.start()
delay(5L * 60 * 1000)
server.refresh()
- assertEquals(ServerState.ERROR, server.state)
+ assertEquals(ServerState.TERMINATED, server.state)
}
@Test
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
index 20ea8d20..28fd8217 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -102,7 +102,7 @@ class InternalServerTest {
val image = mockk<InternalImage>()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) }
+ every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer, 0) }
server.start()
@@ -160,7 +160,7 @@ class InternalServerTest {
val flavor = mockk<InternalFlavor>()
val image = mockk<InternalImage>()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- val request = ComputeServiceImpl.SchedulingRequest(server)
+ val request = ComputeServiceImpl.SchedulingRequest(server, 0)
every { service.schedule(any()) } returns request
@@ -223,7 +223,7 @@ class InternalServerTest {
val flavor = mockk<InternalFlavor>()
val image = mockk<InternalImage>()
val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
- val request = ComputeServiceImpl.SchedulingRequest(server)
+ val request = ComputeServiceImpl.SchedulingRequest(server, 0)
every { service.schedule(any()) } returns request
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 20e5a9db..213d20ee 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.simulator
+import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
@@ -59,7 +60,7 @@ public class SimHost(
override val meta: Map<String, Any>,
context: CoroutineContext,
interpreter: SimResourceInterpreter,
- meter: Meter,
+ private val meter: Meter,
hypervisor: SimHypervisorProvider,
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
@@ -72,6 +73,11 @@ public class SimHost(
override val scope: CoroutineScope = CoroutineScope(context + Job())
/**
+ * The clock instance used by the host.
+ */
+ private val clock = interpreter.clock
+
+ /**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
@@ -82,11 +88,6 @@ public class SimHost(
private val listeners = mutableListOf<HostListener>()
/**
- * Current total memory use of the images on this hypervisor.
- */
- private var availableMemory: Long = model.memory.sumOf { it.size }
-
- /**
* The machine to run on.
*/
public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, powerDriver)
@@ -115,6 +116,8 @@ public class SimHost(
_cpuDemand.record(cpuDemand)
_cpuUsage.record(cpuUsage)
_powerUsage.record(machine.powerDraw)
+
+ reportTime()
}
}
)
@@ -221,6 +224,33 @@ public class SimHost(
.build()
.bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+ /**
+ * The amount of time in the system.
+ */
+ private val _totalTime = meter.counterBuilder("host.time.total")
+ .setDescription("The amount of time in the system")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+
+ /**
+ * The uptime of the host.
+ */
+ private val _upTime = meter.counterBuilder("host.time.up")
+ .setDescription("The uptime of the host")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+
+ /**
+ * The downtime of the host.
+ */
+ private val _downTime = meter.counterBuilder("host.time.down")
+ .setDescription("The downtime of the host")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString()))
+
init {
// Launch hypervisor onto machine
scope.launch {
@@ -238,25 +268,44 @@ public class SimHost(
}
}
+ private var _lastReport = clock.millis()
+
+ private fun reportTime() {
+ if (!scope.isActive)
+ return
+
+ val now = clock.millis()
+ val duration = now - _lastReport
+
+ _totalTime.add(duration)
+ when (_state) {
+ HostState.UP -> _upTime.add(duration)
+ HostState.DOWN -> _downTime.add(duration)
+ }
+
+ // Track time of guests
+ for (guest in guests.values) {
+ guest.reportTime()
+ }
+
+ _lastReport = now
+ }
+
override fun canFit(server: Server): Boolean {
- val sufficientMemory = availableMemory > server.flavor.memorySize
- val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount
+ val sufficientMemory = model.memorySize >= server.flavor.memorySize
+ val enoughCpus = model.cpuCount >= server.flavor.cpuCount
val canFit = hypervisor.canFit(server.flavor.toMachineModel())
return sufficientMemory && enoughCpus && canFit
}
override suspend fun spawn(server: Server, start: Boolean) {
- // Return if the server already exists on this host
- if (server in this) {
- return
+ val guest = guests.computeIfAbsent(server) { key ->
+ require(canFit(key)) { "Server does not fit" }
+ _guests.add(1)
+ Guest(key, hypervisor.createMachine(key.flavor.toMachineModel(), key.name))
}
- require(canFit(server)) { "Server does not fit" }
- val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel(), server.name))
- guests[server] = guest
- _guests.add(1)
-
if (start) {
guest.start()
}
@@ -291,6 +340,7 @@ public class SimHost(
}
override fun close() {
+ reportTime()
scope.cancel()
machine.close()
}
@@ -320,6 +370,7 @@ public class SimHost(
}
override suspend fun fail() {
+ reportTime()
_state = HostState.DOWN
for (guest in guests.values) {
guest.fail()
@@ -327,6 +378,7 @@ public class SimHost(
}
override suspend fun recover() {
+ reportTime()
_state = HostState.UP
for (guest in guests.values) {
guest.start()
@@ -339,6 +391,33 @@ public class SimHost(
private inner class Guest(val server: Server, val machine: SimMachine) {
var state: ServerState = ServerState.TERMINATED
+ /**
+ * The amount of time in the system.
+ */
+ private val _totalTime = meter.counterBuilder("guest.time.total")
+ .setDescription("The amount of time in the system")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
+
+ /**
+ * The uptime of the guest.
+ */
+ private val _runningTime = meter.counterBuilder("guest.time.running")
+ .setDescription("The uptime of the guest")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
+
+ /**
+ * The time the guest is in an error state.
+ */
+ private val _errorTime = meter.counterBuilder("guest.time.error")
+ .setDescription("The time the guest is in an error state")
+ .setUnit("ms")
+ .build()
+ .bind(Attributes.of(AttributeKey.stringKey("server.id"), server.uid.toString()))
+
suspend fun start() {
when (state) {
ServerState.TERMINATED, ServerState.ERROR -> {
@@ -373,6 +452,9 @@ public class SimHost(
}
suspend fun fail() {
+ if (state != ServerState.RUNNING) {
+ return
+ }
stop()
state = ServerState.ERROR
}
@@ -414,8 +496,26 @@ public class SimHost(
else
ServerState.ERROR
- availableMemory += server.flavor.memorySize
onGuestStop(this)
}
+
+ private var _lastReport = clock.millis()
+
+ fun reportTime() {
+ if (state == ServerState.DELETED)
+ return
+
+ val now = clock.millis()
+ val duration = now - _lastReport
+
+ _totalTime.add(duration)
+ when (state) {
+ ServerState.RUNNING -> _runningTime.add(duration)
+ ServerState.ERROR -> _errorTime.add(duration)
+ else -> {}
+ }
+
+ _lastReport = now
+ }
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 1ba3a9a1..31215e9a 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -189,6 +189,10 @@ internal class SimHostTest {
fun testFailure() = runBlockingSimulation {
var requestedWork = 0L
var grantedWork = 0L
+ var totalTime = 0L
+ var downTime = 0L
+ var guestTotalTime = 0L
+ var guestDownTime = 0L
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
@@ -238,6 +242,18 @@ internal class SimHostTest {
metricsByName["cpu.work.granted"]?.let {
grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong()
}
+ metricsByName["host.time.total"]?.let {
+ totalTime = it.longSumData.points.first().value
+ }
+ metricsByName["host.time.down"]?.let {
+ downTime = it.longSumData.points.first().value
+ }
+ metricsByName["guest.time.total"]?.let {
+ guestTotalTime = it.longSumData.points.first().value
+ }
+ metricsByName["guest.time.error"]?.let {
+ guestDownTime = it.longSumData.points.first().value
+ }
return CompletableResultCode.ofSuccess()
}
@@ -275,6 +291,10 @@ internal class SimHostTest {
assertAll(
{ assertEquals(2226039, requestedWork, "Total time does not match") },
{ assertEquals(1086039, grantedWork, "Down time does not match") },
+ { assertEquals(1200001, totalTime, "Total time does not match") },
+ { assertEquals(1200001, guestTotalTime, "Guest total time does not match") },
+ { assertEquals(5000, downTime, "Down time does not match") },
+ { assertEquals(5000, guestDownTime, "Guest down time does not match") },
)
}