From 0bbb0adb97ba4783bbd0073f845781725e6212e8 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 25 Mar 2021 10:23:47 +0100 Subject: compute: Add test suite for ComputeService This change adds a test suite for the OpenDC compute service. --- .../opendc-compute-service/build.gradle.kts | 3 +- .../compute/service/internal/ClientFlavor.kt | 6 + .../opendc/compute/service/internal/ClientImage.kt | 6 + .../compute/service/internal/ClientServer.kt | 6 + .../compute/service/internal/ComputeServiceImpl.kt | 274 ++++++++------- .../opendc/compute/service/internal/HostView.kt | 2 + .../compute/service/internal/InternalFlavor.kt | 4 +- .../compute/service/internal/InternalImage.kt | 4 +- .../compute/service/internal/InternalServer.kt | 46 ++- .../service/scheduler/ReplayAllocationPolicy.kt | 7 +- .../opendc/compute/service/ComputeServiceTest.kt | 390 +++++++++++++++++++++ .../opendc/compute/service/InternalFlavorTest.kt | 80 +++++ .../opendc/compute/service/InternalImageTest.kt | 81 +++++ .../opendc/compute/service/InternalServerTest.kt | 285 +++++++++++++++ .../service/scheduler/AllocationPolicyTest.kt | 219 ++++++++++++ .../src/test/resources/log4j2.xml | 38 ++ .../org/opendc/experiments/capelin/Portfolio.kt | 8 +- .../src/main/kotlin/org/opendc/runner/web/Main.kt | 1 - 18 files changed, 1298 insertions(+), 162 deletions(-) create mode 100644 simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt create mode 100644 simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt create mode 100644 simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt create mode 100644 simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt create mode 100644 simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt create mode 100644 simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml (limited to 'simulator') diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts index 1b09ef6d..1825e989 100644 --- a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts @@ -26,6 +26,7 @@ description = "OpenDC Compute Service implementation" plugins { `kotlin-library-conventions` `testing-conventions` + `jacoco-conventions` } dependencies { @@ -36,5 +37,5 @@ dependencies { implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) - testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl") } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt index 29f10e27..4a8d3046 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt @@ -59,4 +59,10 @@ internal class ClientFlavor(private val delegate: Flavor) : Flavor { labels = delegate.labels meta = delegate.meta } + + override fun equals(other: Any?): Boolean = other is Flavor && other.uid == uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Flavor[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt index 6c5b2ab0..e0b5c171 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt @@ -52,4 +52,10 @@ internal class ClientImage(private val delegate: Image) : Image { labels = delegate.labels meta = delegate.meta } + + override fun equals(other: Any?): Boolean = other is Image && other.uid == uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Image[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt index ae4cee3b..f2929bf3 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -104,4 +104,10 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche watcher.onStateChanged(this, newState) } } + + override fun equals(other: Any?): Boolean = other is Server && other.uid == uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Server[uid=$uid,name=$name,state=$state]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index aa7e0aa1..62808b4d 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -57,7 +57,7 @@ public class ComputeServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - private val scope = CoroutineScope(context) + private val scope = CoroutineScope(context + Job()) /** * The logger instance of this server. @@ -133,130 +133,133 @@ public class ComputeServiceImpl( override val hostCount: Int get() = hostToView.size - override fun newClient(): ComputeClient = object : ComputeClient { - private var isClosed: Boolean = false + override fun newClient(): ComputeClient { + check(scope.isActive) { "Service is already closed" } + return object : ComputeClient { + private var isClosed: Boolean = false - override suspend fun queryFlavors(): List { - check(!isClosed) { "Client is already closed" } + override suspend fun queryFlavors(): List { + check(!isClosed) { "Client is already closed" } - return flavors.values.map { ClientFlavor(it) } - } + return flavors.values.map { ClientFlavor(it) } + } - override suspend fun findFlavor(id: UUID): Flavor? { - check(!isClosed) { "Client is already closed" } + override suspend fun findFlavor(id: UUID): Flavor? { + check(!isClosed) { "Client is already closed" } - return flavors[id]?.let { ClientFlavor(it) } - } + return flavors[id]?.let { ClientFlavor(it) } + } - override suspend fun newFlavor( - name: String, - cpuCount: Int, - memorySize: Long, - labels: Map, - meta: Map - ): Flavor { - check(!isClosed) { "Client is already closed" } - - val uid = UUID(clock.millis(), random.nextLong()) - val flavor = InternalFlavor( - this@ComputeServiceImpl, - uid, - name, - cpuCount, - memorySize, - labels, - meta - ) + override suspend fun newFlavor( + name: String, + cpuCount: Int, + memorySize: Long, + labels: Map, + meta: Map + ): Flavor { + check(!isClosed) { "Client is already closed" } + + val uid = UUID(clock.millis(), random.nextLong()) + val flavor = InternalFlavor( + this@ComputeServiceImpl, + uid, + name, + cpuCount, + memorySize, + labels, + meta + ) - flavors[uid] = flavor + flavors[uid] = flavor - return ClientFlavor(flavor) - } + return ClientFlavor(flavor) + } - override suspend fun queryImages(): List { - check(!isClosed) { "Client is already closed" } + override suspend fun queryImages(): List { + check(!isClosed) { "Client is already closed" } - return images.values.map { ClientImage(it) } - } + return images.values.map { ClientImage(it) } + } - override suspend fun findImage(id: UUID): Image? { - check(!isClosed) { "Client is already closed" } + override suspend fun findImage(id: UUID): Image? { + check(!isClosed) { "Client is already closed" } - return images[id]?.let { ClientImage(it) } - } + return images[id]?.let { ClientImage(it) } + } - override suspend fun newImage(name: String, labels: Map, meta: Map): Image { - check(!isClosed) { "Client is already closed" } + override suspend fun newImage(name: String, labels: Map, meta: Map): Image { + check(!isClosed) { "Client is already closed" } - val uid = UUID(clock.millis(), random.nextLong()) - val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) + val uid = UUID(clock.millis(), random.nextLong()) + val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) - images[uid] = image + images[uid] = image - return ClientImage(image) - } + return ClientImage(image) + } - override suspend fun newServer( - name: String, - image: Image, - flavor: Flavor, - labels: Map, - meta: Map, - start: Boolean - ): Server { - check(!isClosed) { "Client is closed" } - tracer.commit(VmSubmissionEvent(name, image, flavor)) + override suspend fun newServer( + name: String, + image: Image, + flavor: Flavor, + labels: Map, + meta: Map, + start: Boolean + ): Server { + check(!isClosed) { "Client is closed" } + tracer.commit(VmSubmissionEvent(name, image, flavor)) - _events.emit( - ComputeServiceEvent.MetricsAvailable( + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + ) + ) + + val uid = UUID(clock.millis(), random.nextLong()) + val server = InternalServer( this@ComputeServiceImpl, - hostCount, - availableHosts.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms + uid, + name, + requireNotNull(flavors[flavor.uid]) { "Unknown flavor" }, + requireNotNull(images[image.uid]) { "Unknown image" }, + labels.toMutableMap(), + meta.toMutableMap() ) - ) - val uid = UUID(clock.millis(), random.nextLong()) - val server = InternalServer( - this@ComputeServiceImpl, - uid, - name, - flavor, - image, - labels.toMutableMap(), - meta.toMutableMap() - ) + servers[uid] = server - servers[uid] = server + if (start) { + server.start() + } - if (start) { - server.start() + return ClientServer(server) } - return ClientServer(server) - } + override suspend fun findServer(id: UUID): Server? { + check(!isClosed) { "Client is already closed" } - override suspend fun findServer(id: UUID): Server? { - check(!isClosed) { "Client is already closed" } + return servers[id]?.let { ClientServer(it) } + } - return servers[id]?.let { ClientServer(it) } - } + override suspend fun queryServers(): List { + check(!isClosed) { "Client is already closed" } - override suspend fun queryServers(): List { - check(!isClosed) { "Client is already closed" } + return servers.values.map { ClientServer(it) } + } - return servers.values.map { ClientServer(it) } - } + override fun close() { + isClosed = true + } - override fun close() { - isClosed = true + override fun toString(): String = "ComputeClient" } - - override fun toString(): String = "ComputeClient" } override fun addHost(host: Host) { @@ -285,23 +288,25 @@ public class ComputeServiceImpl( scope.cancel() } - internal fun schedule(server: InternalServer) { + internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } - queue.add(SchedulingRequest(server)) + val request = SchedulingRequest(server) + queue.add(request) requestSchedulingCycle() + return request } internal fun delete(flavor: InternalFlavor) { - checkNotNull(flavors.remove(flavor.uid)) { "Flavor was not known" } + flavors.remove(flavor.uid) } internal fun delete(image: InternalImage) { - checkNotNull(images.remove(image.uid)) { "Image was not known" } + images.remove(image.uid) } internal fun delete(server: InternalServer) { - checkNotNull(servers.remove(server.uid)) { "Server was not known" } + servers.remove(server.uid) } /** @@ -338,7 +343,7 @@ public class ComputeServiceImpl( val server = request.server val hv = allocationLogic.select(availableHosts, request.server) if (hv == null || !hv.host.canFit(server)) { - logger.trace { "Server $server selected for scheduling but no capacity available for it." } + logger.trace { "Server $server selected for scheduling but no capacity available for it at the moment" } if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { tracer.commit(VmSubmissionInvalidEvent(server.name)) @@ -360,6 +365,8 @@ public class ComputeServiceImpl( queue.poll() logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + + server.state = ServerState.ERROR continue } else { break @@ -372,42 +379,39 @@ public class ComputeServiceImpl( queue.poll() logger.info { "Assigned server $server to host $host." } - try { - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - hv.numberOfActiveServers++ - hv.provisionedCores += server.flavor.cpuCount - hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack - - scope.launch { - try { - server.assignHost(host) - host.spawn(server) - activeServers[server] = host - - tracer.commit(VmScheduledEvent(server.name)) - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - ) + + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + hv.numberOfActiveServers++ + hv.provisionedCores += server.flavor.cpuCount + hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack + + scope.launch { + try { + server.host = host + host.spawn(server) + activeServers[server] = host + + tracer.commit(VmScheduledEvent(server.name)) + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms ) - } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + ) + } catch (e: Throwable) { + logger.error("Failed to deploy VM", e) - hv.numberOfActiveServers-- - hv.provisionedCores -= server.flavor.cpuCount - hv.availableMemory += server.flavor.memorySize - } + hv.numberOfActiveServers-- + hv.provisionedCores -= server.flavor.cpuCount + hv.availableMemory += server.flavor.memorySize } - } catch (e: Exception) { - logger.warn(e) { "Failed to assign server $server to $host. " } } } } @@ -415,7 +419,7 @@ public class ComputeServiceImpl( /** * A request to schedule an [InternalServer] onto one of the [Host]s. */ - private data class SchedulingRequest(val server: InternalServer) { + internal data class SchedulingRequest(val server: InternalServer) { /** * A flag to indicate that the request is cancelled. */ diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt index 1bdfdf1a..5793541f 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt @@ -32,4 +32,6 @@ public class HostView(public val host: Host) { public var numberOfActiveServers: Int = 0 public var availableMemory: Long = host.model.memorySize public var provisionedCores: Int = 0 + + override fun toString(): String = "HostView[host=$host]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt index 95e280df..b8fb6279 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt @@ -58,7 +58,9 @@ internal class InternalFlavor( service.delete(this) } - override fun equals(other: Any?): Boolean = other is InternalFlavor && uid == other.uid + override fun equals(other: Any?): Boolean = other is Flavor && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Flavor[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt index 86f2f6b9..d9ed5896 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt @@ -48,7 +48,9 @@ internal class InternalImage( service.delete(this) } - override fun equals(other: Any?): Boolean = other is InternalImage && uid == other.uid + override fun equals(other: Any?): Boolean = other is Image && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Image[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index ff7c1d15..d9d0f3fc 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -34,8 +34,8 @@ internal class InternalServer( private val service: ComputeServiceImpl, override val uid: UUID, override val name: String, - override val flavor: Flavor, - override val image: Image, + override val flavor: InternalFlavor, + override val image: InternalImage, override val labels: MutableMap, override val meta: MutableMap ) : Server { @@ -54,6 +54,11 @@ internal class InternalServer( */ internal var host: Host? = null + /** + * The current scheduling request. + */ + private var request: ComputeServiceImpl.SchedulingRequest? = null + override suspend fun start() { when (state) { ServerState.RUNNING -> { @@ -66,35 +71,43 @@ internal class InternalServer( } ServerState.DELETED -> { logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") + throw IllegalStateException("Server is terminated") } else -> { logger.info { "User requested to start server $uid" } state = ServerState.PROVISIONING - service.schedule(this) + assert(request == null) { "Scheduling request already active" } + request = service.schedule(this) } } } override suspend fun stop() { when (state) { - ServerState.PROVISIONING -> {} // TODO Find way to interrupt these + ServerState.PROVISIONING -> { + cancelProvisioningRequest() + state = ServerState.TERMINATED + } ServerState.RUNNING, ServerState.ERROR -> { val host = checkNotNull(host) { "Server not running" } host.stop(this) } - ServerState.TERMINATED -> {} // No work needed - ServerState.DELETED -> throw IllegalStateException("Server is terminated") + ServerState.TERMINATED, ServerState.DELETED -> {} // No work needed } } override suspend fun delete() { when (state) { - ServerState.PROVISIONING -> {} // TODO Find way to interrupt these - ServerState.RUNNING -> { + ServerState.PROVISIONING, ServerState.TERMINATED -> { + cancelProvisioningRequest() + service.delete(this) + state = ServerState.DELETED + } + ServerState.RUNNING, ServerState.ERROR -> { val host = checkNotNull(host) { "Server not running" } host.delete(this) service.delete(this) + state = ServerState.DELETED } else -> {} // No work needed } @@ -121,11 +134,20 @@ internal class InternalServer( field = value } - internal fun assignHost(host: Host) { - this.host = host + /** + * Cancel the provisioning request if active. + */ + private fun cancelProvisioningRequest() { + val request = request + if (request != null) { + this.request = null + request.isCancelled = true + } } - override fun equals(other: Any?): Boolean = other is InternalServer && uid == other.uid + override fun equals(other: Any?): Boolean = other is Server && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Server[uid=$uid,state=$state]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt index ed1dc662..2c953f8b 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt @@ -20,14 +20,11 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView -import org.opendc.compute.service.scheduler.AllocationPolicy - -private val logger = KotlinLogging.logger {} /** * Policy replaying VM-cluster assignment. @@ -36,6 +33,8 @@ private val logger = KotlinLogging.logger {} * assigned the VM image. */ public class ReplayAllocationPolicy(private val vmPlacements: Map) : AllocationPolicy { + private val logger = KotlinLogging.logger {} + override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( hypervisors: Set, diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt new file mode 100644 index 00000000..ffec92ea --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -0,0 +1,390 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import io.mockk.* +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.compute.api.* +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener +import org.opendc.compute.service.driver.HostModel +import org.opendc.compute.service.driver.HostState +import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer +import java.util.* + +/** + * Test suite for the [ComputeService] interface. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class ComputeServiceTest { + lateinit var scope: TestCoroutineScope + lateinit var service: ComputeService + + @BeforeEach + fun setUp() { + scope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(scope) + val tracer = EventTracer(clock) + val policy = AvailableMemoryAllocationPolicy() + service = ComputeService(scope.coroutineContext, clock, tracer, policy) + } + + @AfterEach + fun tearDown() { + scope.cleanupTestCoroutines() + } + + @Test + fun testClientClose() = scope.runBlockingTest { + val client = service.newClient() + + assertEquals(emptyList(), client.queryFlavors()) + assertEquals(emptyList(), client.queryImages()) + assertEquals(emptyList(), client.queryServers()) + + client.close() + + assertThrows { client.queryFlavors() } + assertThrows { client.queryImages() } + assertThrows { client.queryServers() } + + assertThrows { client.findFlavor(UUID.randomUUID()) } + assertThrows { client.findImage(UUID.randomUUID()) } + assertThrows { client.findServer(UUID.randomUUID()) } + + assertThrows { client.newFlavor("test", 1, 2) } + assertThrows { client.newImage("test") } + assertThrows { client.newServer("test", mockk(), mockk()) } + } + + @Test + fun testClientCreate() = scope.runBlockingTest { + val client = service.newClient() + + val flavor = client.newFlavor("test", 1, 1024) + assertEquals(listOf(flavor), client.queryFlavors()) + assertEquals(flavor, client.findFlavor(flavor.uid)) + val image = client.newImage("test") + assertEquals(listOf(image), client.queryImages()) + assertEquals(image, client.findImage(image.uid)) + val server = client.newServer("test", image, flavor, start = false) + assertEquals(listOf(server), client.queryServers()) + assertEquals(server, client.findServer(server.uid)) + + server.delete() + assertNull(client.findServer(server.uid)) + + image.delete() + assertNull(client.findImage(image.uid)) + + flavor.delete() + assertNull(client.findFlavor(flavor.uid)) + + assertThrows { server.start() } + } + + @Test + fun testClientOnClose() = scope.runBlockingTest { + service.close() + assertThrows { + service.newClient() + } + } + + @Test + fun testAddHost() = scope.runBlockingTest { + val host = mockk(relaxUnitFun = true) + + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + + assertEquals(0, service.hostCount) + assertEquals(emptySet(), service.hosts) + + service.addHost(host) + + verify(exactly = 1) { host.addListener(any()) } + + assertEquals(1, service.hostCount) + assertEquals(1, service.hosts.size) + + service.removeHost(host) + + verify(exactly = 1) { host.removeListener(any()) } + } + + @Test + fun testAddHostDouble() = scope.runBlockingTest { + val host = mockk(relaxUnitFun = true) + + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.DOWN + + assertEquals(0, service.hostCount) + assertEquals(emptySet(), service.hosts) + + service.addHost(host) + service.addHost(host) + + verify(exactly = 1) { host.addListener(any()) } + } + + @Test + fun testServerStartWithoutEnoughCpus() = scope.runBlockingTest { + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 0) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.ERROR, server.state) + } + + @Test + fun testServerStartWithoutEnoughMemory() = scope.runBlockingTest { + val client = service.newClient() + val flavor = client.newFlavor("test", 0, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.ERROR, server.state) + } + + @Test + fun testServerStartWithoutEnoughResources() = scope.runBlockingTest { + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.ERROR, server.state) + } + + @Test + fun testServerCancelRequest() = scope.runBlockingTest { + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + server.stop() + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.TERMINATED, server.state) + } + + @Test + fun testServerCannotFitOnHost() = scope.runBlockingTest { + val host = mockk(relaxUnitFun = true) + + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns false + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(10 * 60 * 1000) + server.refresh() + assertEquals(ServerState.PROVISIONING, server.state) + + verify { host.canFit(server) } + } + + @Test + fun testHostAvailableAfterSomeTime() = scope.runBlockingTest { + val host = mockk(relaxUnitFun = true) + val listeners = mutableListOf() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.DOWN + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + every { host.canFit(any()) } returns false + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5 * 60 * 1000) + + listeners.forEach { it.onStateChanged(host, HostState.UP) } + + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.PROVISIONING, server.state) + + verify { host.canFit(server) } + } + + @Test + fun testHostUnavailableAfterSomeTime() = scope.runBlockingTest { + val host = mockk(relaxUnitFun = true) + val listeners = mutableListOf() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + every { host.canFit(any()) } returns false + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + delay(5 * 60 * 1000) + + listeners.forEach { it.onStateChanged(host, HostState.DOWN) } + + server.start() + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.PROVISIONING, server.state) + + verify(exactly = 0) { host.canFit(server) } + } + + @Test + fun testServerInvalidType() = scope.runBlockingTest { + val host = mockk(relaxUnitFun = true) + val listeners = mutableListOf() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns true + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + assertThrows { + listeners.forEach { it.onStateChanged(host, server, ServerState.RUNNING) } + } + } + + @Test + fun testServerDeploy() = scope.runBlockingTest { + val host = mockk(relaxUnitFun = true) + val listeners = mutableListOf() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns true + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + val slot = slot() + + val watcher = mockk(relaxUnitFun = true) + server.watch(watcher) + + // Start server + server.start() + delay(5 * 60 * 1000) + coVerify { host.spawn(capture(slot), true) } + + listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } + + server.refresh() + assertEquals(ServerState.RUNNING, server.state) + + verify { watcher.onStateChanged(server, ServerState.RUNNING) } + + // Stop server + listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.TERMINATED) } + + server.refresh() + assertEquals(ServerState.TERMINATED, server.state) + + verify { watcher.onStateChanged(server, ServerState.TERMINATED) } + } + + @Test + fun testServerDeployFailure() = scope.runBlockingTest { + val host = mockk(relaxUnitFun = true) + val listeners = mutableListOf() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns true + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + coEvery { host.spawn(any(), true) } throws IllegalStateException() + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5 * 60 * 1000) + + server.refresh() + assertEquals(ServerState.PROVISIONING, server.state) + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt new file mode 100644 index 00000000..18d698c6 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import io.mockk.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals +import org.junit.jupiter.api.Test +import org.opendc.compute.api.Flavor +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.internal.InternalFlavor +import java.util.* + +/** + * Test suite for the [InternalFlavor] implementation. + */ +class InternalFlavorTest { + @Test + fun testEquality() { + val service = mockk() + val uid = UUID.randomUUID() + val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf()) + val b = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf()) + + assertEquals(a, b) + } + + @Test + fun testEqualityWithDifferentType() { + val service = mockk() + val uid = UUID.randomUUID() + val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf()) + + val b = mockk(relaxUnitFun = true) + every { b.uid } returns uid + + assertEquals(a, b) + } + + @Test + fun testInequalityWithDifferentType() { + val service = mockk() + val uid = UUID.randomUUID() + val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf()) + + val b = mockk(relaxUnitFun = true) + every { b.uid } returns UUID.randomUUID() + + assertNotEquals(a, b) + } + + @Test + fun testInequalityWithIncorrectType() { + val service = mockk() + val uid = UUID.randomUUID() + val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf()) + + assertNotEquals(a, Unit) + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt new file mode 100644 index 00000000..e1cb0128 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import io.mockk.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals +import org.junit.jupiter.api.Test +import org.opendc.compute.api.Image +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.internal.InternalFlavor +import org.opendc.compute.service.internal.InternalImage +import java.util.* + +/** + * Test suite for the [InternalFlavor] implementation. + */ +class InternalImageTest { + @Test + fun testEquality() { + val service = mockk() + val uid = UUID.randomUUID() + val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf()) + val b = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf()) + + assertEquals(a, b) + } + + @Test + fun testEqualityWithDifferentType() { + val service = mockk() + val uid = UUID.randomUUID() + val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf()) + + val b = mockk(relaxUnitFun = true) + every { b.uid } returns uid + + assertEquals(a, b) + } + + @Test + fun testInequalityWithDifferentType() { + val service = mockk() + val uid = UUID.randomUUID() + val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf()) + + val b = mockk(relaxUnitFun = true) + every { b.uid } returns UUID.randomUUID() + + assertNotEquals(a, b) + } + + @Test + fun testInequalityWithIncorrectType() { + val service = mockk() + val uid = UUID.randomUUID() + val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf()) + + assertNotEquals(a, Unit) + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt new file mode 100644 index 00000000..81cb45df --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -0,0 +1,285 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import io.mockk.* +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.internal.InternalFlavor +import org.opendc.compute.service.internal.InternalImage +import org.opendc.compute.service.internal.InternalServer +import java.util.* + +/** + * Test suite for the [InternalServer] implementation. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class InternalServerTest { + @Test + fun testEquality() { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + assertEquals(a, b) + } + + @Test + fun testEqualityWithDifferentType() { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + val b = mockk(relaxUnitFun = true) + every { b.uid } returns uid + + assertEquals(a, b) + } + + @Test + fun testInequalityWithDifferentType() { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + val b = mockk(relaxUnitFun = true) + every { b.uid } returns UUID.randomUUID() + + assertNotEquals(a, b) + } + + @Test + fun testInequalityWithIncorrectType() { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + assertNotEquals(a, Unit) + } + + @Test + fun testStartTerminatedServer() = runBlockingTest { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) } + + server.start() + + verify(exactly = 1) { service.schedule(server) } + assertEquals(ServerState.PROVISIONING, server.state) + } + + @Test + fun testStartDeletedServer() = runBlockingTest { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.DELETED + + assertThrows { server.start() } + } + + @Test + fun testStartProvisioningServer() = runBlockingTest { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.PROVISIONING + + server.start() + + assertEquals(ServerState.PROVISIONING, server.state) + } + + @Test + fun testStartRunningServer() = runBlockingTest { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.RUNNING + + server.start() + + assertEquals(ServerState.RUNNING, server.state) + } + + @Test + fun testStopProvisioningServer() = runBlockingTest { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + val request = ComputeServiceImpl.SchedulingRequest(server) + + every { service.schedule(any()) } returns request + + server.start() + server.stop() + + assertTrue(request.isCancelled) + assertEquals(ServerState.TERMINATED, server.state) + } + + @Test + fun testStopTerminatedServer() = runBlockingTest { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.TERMINATED + server.stop() + + assertEquals(ServerState.TERMINATED, server.state) + } + + @Test + fun testStopDeletedServer() = runBlockingTest { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.DELETED + server.stop() + + assertEquals(ServerState.DELETED, server.state) + } + + @Test + fun testStopRunningServer() = runBlockingTest { + val service = mockk() + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + val host = mockk(relaxUnitFun = true) + + server.state = ServerState.RUNNING + server.host = host + server.stop() + yield() + + coVerify { host.stop(server) } + } + + @Test + fun testDeleteProvisioningServer() = runBlockingTest { + val service = mockk(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + val request = ComputeServiceImpl.SchedulingRequest(server) + + every { service.schedule(any()) } returns request + + server.start() + server.delete() + + assertTrue(request.isCancelled) + assertEquals(ServerState.DELETED, server.state) + verify { service.delete(server) } + } + + @Test + fun testDeleteTerminatedServer() = runBlockingTest { + val service = mockk(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.TERMINATED + server.delete() + + assertEquals(ServerState.DELETED, server.state) + + verify { service.delete(server) } + } + + @Test + fun testDeleteDeletedServer() = runBlockingTest { + val service = mockk(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.DELETED + server.delete() + + assertEquals(ServerState.DELETED, server.state) + } + + @Test + fun testDeleteRunningServer() = runBlockingTest { + val service = mockk(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockk() + val image = mockk() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + val host = mockk(relaxUnitFun = true) + + server.state = ServerState.RUNNING + server.host = host + server.delete() + yield() + + coVerify { host.delete(server) } + verify { service.delete(server) } + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt new file mode 100644 index 00000000..db377914 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView +import java.util.* +import java.util.stream.Stream +import kotlin.random.Random + +/** + * Test suite for the [AllocationPolicy] interface. + */ +internal class AllocationPolicyTest { + @ParameterizedTest + @MethodSource("activeServersArgs") + fun testActiveServersPolicy( + reversed: Boolean, + hosts: Set, + server: Server, + expectedHost: HostView? + ) { + val policy = NumberOfActiveServersAllocationPolicy(reversed) + assertEquals(expectedHost, policy.invoke().select(hosts, server)) + } + + @ParameterizedTest + @MethodSource("availableMemoryArgs") + fun testAvailableMemoryPolicy( + reversed: Boolean, + hosts: Set, + server: Server, + expectedHost: HostView? + ) { + val policy = AvailableMemoryAllocationPolicy(reversed) + assertEquals(expectedHost, policy.invoke().select(hosts, server)) + } + + @ParameterizedTest + @MethodSource("availableCoreMemoryArgs") + fun testAvailableCoreMemoryPolicy( + reversed: Boolean, + hosts: Set, + server: Server, + expectedHost: HostView? + ) { + val policy = AvailableMemoryAllocationPolicy(reversed) + assertEquals(expectedHost, policy.invoke().select(hosts, server)) + } + + @ParameterizedTest + @MethodSource("provisionedCoresArgs") + fun testProvisionedPolicy( + reversed: Boolean, + hosts: Set, + server: Server, + expectedHost: HostView? + ) { + val policy = ProvisionedCoresAllocationPolicy(reversed) + assertEquals(expectedHost, policy.invoke().select(hosts, server)) + } + + @Suppress("unused") + private companion object { + /** + * Test arguments for the [NumberOfActiveServersAllocationPolicy]. + */ + @JvmStatic + fun activeServersArgs(): Stream { + val random = Random(1) + val hosts = List(4) { i -> + val view = mockk() + every { view.host.uid } returns UUID(0, i.toLong()) + every { view.host.model.cpuCount } returns random.nextInt(1, 16) + every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024) + every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize) + every { view.numberOfActiveServers } returns random.nextInt(0, 6) + every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount) + every { view.toString() } returns "HostView[$i,numberOfActiveServers=${view.numberOfActiveServers}]" + view + } + + val servers = List(2) { + val server = mockk() + every { server.flavor.cpuCount } returns random.nextInt(1, 8) + every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512) + server + } + + return Stream.of( + Arguments.of(false, hosts.toSet(), servers[0], hosts[2]), + Arguments.of(false, hosts.toSet(), servers[1], hosts[1]), + Arguments.of(true, hosts.toSet(), servers[1], hosts[0]), + ) + } + + /** + * Test arguments for the [AvailableCoreMemoryAllocationPolicy]. + */ + @JvmStatic + fun availableCoreMemoryArgs(): Stream { + val random = Random(1) + val hosts = List(4) { i -> + val view = mockk() + every { view.host.uid } returns UUID(0, i.toLong()) + every { view.host.model.cpuCount } returns random.nextInt(1, 16) + every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024) + every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize) + every { view.numberOfActiveServers } returns random.nextInt(0, 6) + every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount) + every { view.toString() } returns "HostView[$i,availableMemory=${view.availableMemory}]" + view + } + + val servers = List(2) { + val server = mockk() + every { server.flavor.cpuCount } returns random.nextInt(1, 8) + every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512) + server + } + + return Stream.of( + Arguments.of(false, hosts.toSet(), servers[0], hosts[2]), + Arguments.of(false, hosts.toSet(), servers[1], hosts[2]), + Arguments.of(true, hosts.toSet(), servers[1], hosts[1]), + ) + } + + /** + * Test arguments for the [AvailableMemoryAllocationPolicy]. + */ + @JvmStatic + fun availableMemoryArgs(): Stream { + val random = Random(1) + val hosts = List(4) { i -> + val view = mockk() + every { view.host.uid } returns UUID(0, i.toLong()) + every { view.host.model.cpuCount } returns random.nextInt(1, 16) + every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024) + every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize) + every { view.numberOfActiveServers } returns random.nextInt(0, 6) + every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount) + every { view.toString() } returns "HostView[$i,availableMemory=${view.availableMemory}]" + view + } + + val servers = List(2) { + val server = mockk() + every { server.flavor.cpuCount } returns random.nextInt(1, 8) + every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512) + server + } + + return Stream.of( + Arguments.of(false, hosts.toSet(), servers[0], hosts[2]), + Arguments.of(false, hosts.toSet(), servers[1], hosts[2]), + Arguments.of(true, hosts.toSet(), servers[1], hosts[1]), + ) + } + + /** + * Test arguments for the [ProvisionedCoresAllocationPolicy]. + */ + @JvmStatic + fun provisionedCoresArgs(): Stream { + val random = Random(1) + val hosts = List(4) { i -> + val view = mockk() + every { view.host.uid } returns UUID(0, i.toLong()) + every { view.host.model.cpuCount } returns random.nextInt(1, 16) + every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024) + every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize) + every { view.numberOfActiveServers } returns random.nextInt(0, 6) + every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount) + every { view.toString() } returns "HostView[$i,provisionedCores=${view.provisionedCores}]" + view + } + + val servers = List(2) { + val server = mockk() + every { server.flavor.cpuCount } returns random.nextInt(1, 8) + every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512) + server + } + + return Stream.of( + Arguments.of(false, hosts.toSet(), servers[0], hosts[2]), + Arguments.of(false, hosts.toSet(), servers[1], hosts[0]), + Arguments.of(true, hosts.toSet(), servers[1], hosts[0]), + ) + } + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml b/simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml new file mode 100644 index 00000000..0dfb75f2 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + + + diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index f9c96bb6..66f07d97 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -28,13 +28,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging -import org.opendc.compute.service.scheduler.AllocationPolicy -import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy -import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy -import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy -import org.opendc.compute.service.scheduler.RandomAllocationPolicy -import org.opendc.compute.simulator.allocation.* +import org.opendc.compute.service.scheduler.* import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index b9aeecb8..560319ee 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -45,7 +45,6 @@ import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy import org.opendc.compute.service.scheduler.RandomAllocationPolicy -import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.capelin.attachMonitor import org.opendc.experiments.capelin.createComputeService import org.opendc.experiments.capelin.createFailureDomain -- cgit v1.2.3 From 0c5f0a07f87f85119be83b923ffe60b3863ebb9d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 25 Mar 2021 14:33:56 +0100 Subject: compute: Hide internals of compute service implementation This change changes the compute service and users of the compute service to not rely on the internals of `ComputeServiceImpl` and instead use its public API. --- .../compute/service/internal/ComputeServiceImpl.kt | 12 +-- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 2 + .../experiments/capelin/ExperimentHelpers.kt | 92 ++++++++++++---------- .../org/opendc/experiments/capelin/Portfolio.kt | 13 ++- .../experiments/capelin/CapelinIntegrationTest.kt | 19 +++-- .../src/main/kotlin/org/opendc/runner/web/Main.kt | 9 +-- 6 files changed, 75 insertions(+), 72 deletions(-) (limited to 'simulator') diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 62808b4d..18255922 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -47,7 +47,7 @@ import kotlin.math.max * @param context The [CoroutineContext] to use. * @param clock The clock instance to keep track of time. */ -public class ComputeServiceImpl( +internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, private val tracer: EventTracer, @@ -104,11 +104,11 @@ public class ComputeServiceImpl( */ private val servers = mutableMapOf() - public var submittedVms: Int = 0 - public var queuedVms: Int = 0 - public var runningVms: Int = 0 - public var finishedVms: Int = 0 - public var unscheduledVms: Int = 0 + private var submittedVms: Int = 0 + private var queuedVms: Int = 0 + private var runningVms: Int = 0 + private var finishedVms: Int = 0 + private var unscheduledVms: Int = 0 private var maxCores = 0 private var maxMemory = 0L diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 3c4b4410..2e4191cc 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -207,6 +207,8 @@ public class SimHost( _state = HostState.DOWN } + override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" + /** * Convert flavor to machine model. */ diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 44436019..1fdd45ac 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -22,15 +22,10 @@ package org.opendc.experiments.capelin -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService @@ -39,7 +34,6 @@ import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostEvent import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -54,6 +48,7 @@ import org.opendc.simulator.failures.FaultInjector import org.opendc.trace.core.EventTracer import java.io.File import java.time.Clock +import kotlin.coroutines.resume import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -142,7 +137,7 @@ public fun createComputeService( environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, eventTracer: EventTracer -): ComputeServiceImpl { +): ComputeService { val hosts = environmentReader .use { it.read() } .map { def -> @@ -159,7 +154,7 @@ public fun createComputeService( } val scheduler = - ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) for (host in hosts) { scheduler.addHost(host) @@ -177,7 +172,8 @@ public fun attachMonitor( clock: Clock, scheduler: ComputeService, monitor: ExperimentMonitor -) { +): MonitorResults { + val results = MonitorResults() // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -213,18 +209,33 @@ public fun attachMonitor( scheduler.events .onEach { event -> when (event) { - is ComputeServiceEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> { + results.submittedVms = event.totalVmCount + results.queuedVms = event.waitingVmCount + results.runningVms = event.activeVmCount + results.finishedVms = event.inactiveVmCount + results.unscheduledVms = event.failedVmCount monitor.reportProvisionerMetrics(clock.millis(), event) + } } } .launchIn(coroutineScope) + + return results +} + +public class MonitorResults { + public var submittedVms: Int = 0 + public var queuedVms: Int = 0 + public var runningVms: Int = 0 + public var finishedVms: Int = 0 + public var unscheduledVms: Int = 0 } /** * Process the trace. */ public suspend fun processTrace( - coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader, scheduler: ComputeService, @@ -234,43 +245,40 @@ public suspend fun processTrace( val client = scheduler.newClient() val image = client.newImage("vm-image") try { - var submitted = 0 + coroutineScope { + while (reader.hasNext()) { + val entry = reader.next() - while (reader.hasNext()) { - val entry = reader.next() - - submitted++ - delay(max(0, entry.start - clock.millis())) - coroutineScope.launch { - chan.send(Unit) - val server = client.newServer( - entry.name, - image, - client.newFlavor( + delay(max(0, entry.start - clock.millis())) + launch { + chan.send(Unit) + val server = client.newServer( entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta - ) + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + ) + + suspendCancellableCoroutine { cont -> + server.watch(object : ServerWatcher { + override fun onStateChanged(server: Server, newState: ServerState) { + monitor.reportVmStateChange(clock.millis(), server, newState) - server.watch(object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor.reportVmStateChange(clock.millis(), server, newState) + if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) { + cont.resume(Unit) + } + } + }) } - }) + } } } - scheduler.events - .takeWhile { - when (it) { - is ComputeServiceEvent.MetricsAvailable -> - it.inactiveVmCount + it.failedVmCount != submitted - } - } - .collect() - delay(1) + yield() } finally { reader.close() client.close() diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 66f07d97..5a7eadad 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -169,9 +169,8 @@ public abstract class Portfolio(name: String) : Experiment(name) { null } - attachMonitor(this, clock, scheduler, monitor) + val monitorResults = attachMonitor(this, clock, scheduler, monitor) processTrace( - this, clock, trace, scheduler, @@ -179,11 +178,11 @@ public abstract class Portfolio(name: String) : Experiment(name) { monitor ) - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") + logger.debug("SUBMIT=${monitorResults.submittedVms}") + logger.debug("FAIL=${monitorResults.unscheduledVms}") + logger.debug("QUEUED=${monitorResults.queuedVms}") + logger.debug("RUNNING=${monitorResults.runningVms}") + logger.debug("FINISHED=${monitorResults.finishedVms}") failureDomain?.cancel() scheduler.close() diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index a812490a..c16f7003 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -33,8 +33,8 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -94,7 +94,8 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: ComputeServiceImpl + lateinit var scheduler: ComputeService + lateinit var monitorResults: MonitorResults val tracer = EventTracer(clock) testScope.launch { @@ -120,9 +121,8 @@ class CapelinIntegrationTest { null } - attachMonitor(this, clock, scheduler, monitor) + monitorResults = attachMonitor(this, clock, scheduler, monitor) processTrace( - this, clock, traceReader, scheduler, @@ -130,7 +130,7 @@ class CapelinIntegrationTest { monitor ) - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") failureDomain?.cancel() scheduler.close() @@ -141,8 +141,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, + { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, + { assertEquals(50, monitorResults.finishedVms, "All VMs should finish after a run") }, { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, @@ -167,9 +167,8 @@ class CapelinIntegrationTest { allocationPolicy, tracer ) - attachMonitor(this, clock, scheduler, monitor) + val monitorResults = attachMonitor(this, clock, scheduler, monitor) processTrace( - this, clock, traceReader, scheduler, @@ -179,7 +178,7 @@ class CapelinIntegrationTest { yield() - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") scheduler.close() monitor.close() diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 560319ee..9c92bbf8 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -268,9 +268,8 @@ public class RunnerCli : CliktCommand(name = "runner") { null } - attachMonitor(this, clock, scheduler, monitor) + val monitorResults = attachMonitor(this, clock, scheduler, monitor) processTrace( - this, clock, trace, scheduler, @@ -278,11 +277,7 @@ public class RunnerCli : CliktCommand(name = "runner") { monitor ) - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}" } failureDomain?.cancel() scheduler.close() -- cgit v1.2.3 From 0d66ef47d6e1ec0861b4939800c5070f96600ca0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 25 Mar 2021 14:49:07 +0100 Subject: compute: Remove dependency on event tracer This change removes the dependency on the event tracer in `opendc-trace`, since we are in the process of migrating OpenDC to use OpenTelemetry for distributed tracing and metrics. --- .../opendc-compute-service/build.gradle.kts | 1 - .../org/opendc/compute/service/ComputeService.kt | 5 +--- .../service/events/HypervisorAvailableEvent.kt | 31 --------------------- .../service/events/HypervisorUnavailableEvent.kt | 31 --------------------- .../compute/service/events/VmScheduledEvent.kt | 30 -------------------- .../compute/service/events/VmStoppedEvent.kt | 30 -------------------- .../compute/service/events/VmSubmissionEvent.kt | 32 ---------------------- .../service/events/VmSubmissionInvalidEvent.kt | 30 -------------------- .../compute/service/internal/ComputeServiceImpl.kt | 14 ---------- .../opendc/compute/service/ComputeServiceTest.kt | 4 +-- .../experiments/capelin/ExperimentHelpers.kt | 6 ++-- .../org/opendc/experiments/capelin/Portfolio.kt | 5 +--- .../experiments/capelin/CapelinIntegrationTest.kt | 9 ++---- .../sc18/UnderspecificationExperiment.kt | 1 - .../src/main/kotlin/org/opendc/runner/web/Main.kt | 5 +--- .../StageWorkflowSchedulerIntegrationTest.kt | 2 +- 16 files changed, 9 insertions(+), 227 deletions(-) delete mode 100644 simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt delete mode 100644 simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt delete mode 100644 simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt delete mode 100644 simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt delete mode 100644 simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt delete mode 100644 simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt (limited to 'simulator') diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts index 1825e989..41b506b2 100644 --- a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts @@ -32,7 +32,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 593e4b56..28cef83a 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -27,7 +27,6 @@ import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AllocationPolicy -import org.opendc.trace.core.EventTracer import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -76,17 +75,15 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param tracer The event tracer to use. * @param allocationPolicy The allocation policy to use. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - tracer: EventTracer, allocationPolicy: AllocationPolicy, schedulingQuantum: Long = 300000, ): ComputeService { - return ComputeServiceImpl(context, clock, tracer, allocationPolicy, schedulingQuantum) + return ComputeServiceImpl(context, clock, allocationPolicy, schedulingQuantum) } } } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt deleted file mode 100644 index a7974062..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.trace.core.Event -import java.util.* - -/** - * This event is emitted when a hypervisor has become available. - */ -public class HypervisorAvailableEvent(public val uid: UUID) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt deleted file mode 100644 index 75bb09ed..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.trace.core.Event -import java.util.* - -/** - * This event is emitted when a hypervisor has become unavailable. - */ -public class HypervisorUnavailableEvent(public val uid: UUID) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt deleted file mode 100644 index f59c74b7..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.trace.core.Event - -/** - * This event is emitted when a virtual machine has successfully been scheduled on a hypervisor. - */ -public class VmScheduledEvent(public val name: String) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt deleted file mode 100644 index eaf0736b..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.trace.core.Event - -/** - * This event is emitted when a virtual machine has stopped running. - */ -public class VmStoppedEvent(public val name: String) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt deleted file mode 100644 index fa0a8a13..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.trace.core.Event - -/** - * This event is emitted when a virtual machine is submitted to the provisioning service. - */ -public class VmSubmissionEvent(public val name: String, public val image: Image, public val flavor: Flavor) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt deleted file mode 100644 index 52b91616..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.trace.core.Event - -/** - * An event that is emitted when the submission is deemed to be invalid. - */ -public class VmSubmissionInvalidEvent(public val name: String) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 18255922..f9bd7fbc 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -31,9 +31,7 @@ import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.events.* import org.opendc.compute.service.scheduler.AllocationPolicy -import org.opendc.trace.core.EventTracer import org.opendc.utils.TimerScheduler import org.opendc.utils.flow.EventFlow import java.time.Clock @@ -50,7 +48,6 @@ import kotlin.math.max internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - private val tracer: EventTracer, private val allocationPolicy: AllocationPolicy, private val schedulingQuantum: Long ) : ComputeService, HostListener { @@ -207,8 +204,6 @@ internal class ComputeServiceImpl( start: Boolean ): Server { check(!isClosed) { "Client is closed" } - tracer.commit(VmSubmissionEvent(name, image, flavor)) - _events.emit( ComputeServiceEvent.MetricsAvailable( this@ComputeServiceImpl, @@ -346,8 +341,6 @@ internal class ComputeServiceImpl( logger.trace { "Server $server selected for scheduling but no capacity available for it at the moment" } if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { - tracer.commit(VmSubmissionInvalidEvent(server.name)) - _events.emit( ComputeServiceEvent.MetricsAvailable( this@ComputeServiceImpl, @@ -392,7 +385,6 @@ internal class ComputeServiceImpl( host.spawn(server) activeServers[server] = host - tracer.commit(VmScheduledEvent(server.name)) _events.emit( ComputeServiceEvent.MetricsAvailable( this@ComputeServiceImpl, @@ -437,8 +429,6 @@ internal class ComputeServiceImpl( availableHosts += hv } - tracer.commit(HypervisorAvailableEvent(host.uid)) - _events.emit( ComputeServiceEvent.MetricsAvailable( this@ComputeServiceImpl, @@ -461,8 +451,6 @@ internal class ComputeServiceImpl( val hv = hostToView[host] ?: return availableHosts -= hv - tracer.commit(HypervisorUnavailableEvent(hv.uid)) - _events.emit( ComputeServiceEvent.MetricsAvailable( this@ComputeServiceImpl, @@ -495,8 +483,6 @@ internal class ComputeServiceImpl( if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } - tracer.commit(VmStoppedEvent(server.name)) - _events.emit( ComputeServiceEvent.MetricsAvailable( this@ComputeServiceImpl, diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index ffec92ea..e1482152 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -40,7 +40,6 @@ import org.opendc.compute.service.driver.HostModel import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer import java.util.* /** @@ -55,9 +54,8 @@ internal class ComputeServiceTest { fun setUp() { scope = TestCoroutineScope() val clock = DelayControllerClockAdapter(scope) - val tracer = EventTracer(clock) val policy = AvailableMemoryAllocationPolicy() - service = ComputeService(scope.coroutineContext, clock, tracer, policy) + service = ComputeService(scope.coroutineContext, clock, policy) } @AfterEach diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 1fdd45ac..6f99a44e 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -45,7 +45,6 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector -import org.opendc.trace.core.EventTracer import java.io.File import java.time.Clock import kotlin.coroutines.resume @@ -135,8 +134,7 @@ public fun createComputeService( coroutineScope: CoroutineScope, clock: Clock, environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy, - eventTracer: EventTracer + allocationPolicy: AllocationPolicy ): ComputeService { val hosts = environmentReader .use { it.read() } @@ -154,7 +152,7 @@ public fun createComputeService( } val scheduler = - ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) + ComputeService(coroutineScope.coroutineContext, clock, allocationPolicy) for (host in hosts) { scheduler.addHost(host) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 5a7eadad..46e0bcb9 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -41,7 +41,6 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer import java.io.File import java.util.concurrent.ConcurrentHashMap import kotlin.random.Random @@ -114,7 +113,6 @@ public abstract class Portfolio(name: String) : Experiment(name) { override fun doRun(repeat: Int) { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) val seeder = Random(repeat) val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) @@ -151,8 +149,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { this, clock, environment, - allocationPolicy, - tracer + allocationPolicy ) val failureDomain = if (operationalPhenomena.failureFrequency > 0) { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index c16f7003..a836b334 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -45,7 +45,6 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer import java.io.File import java.time.Clock @@ -96,15 +95,13 @@ class CapelinIntegrationTest { val environmentReader = createTestEnvironmentReader() lateinit var scheduler: ComputeService lateinit var monitorResults: MonitorResults - val tracer = EventTracer(clock) testScope.launch { scheduler = createComputeService( this, clock, environmentReader, - allocationPolicy, - tracer + allocationPolicy ) val failureDomain = if (failures) { @@ -157,15 +154,13 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - val tracer = EventTracer(clock) testScope.launch { val scheduler = createComputeService( this, clock, environmentReader, - allocationPolicy, - tracer + allocationPolicy ) val monitorResults = attachMonitor(this, clock, scheduler, monitor) processTrace( diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 9e305b3d..98e25be9 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -100,7 +100,6 @@ public class UnderspecificationExperiment : Experiment("underspecification") { val compute = ComputeService( testScope.coroutineContext, clock, - tracer, NumberOfActiveServersAllocationPolicy(), ) diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 9c92bbf8..68ea3fb9 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -54,7 +54,6 @@ import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer import java.io.File import kotlin.coroutines.coroutineContext import kotlin.random.Random @@ -243,15 +242,13 @@ public class RunnerCli : CliktCommand(name = "runner") { val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = TopologyParser(topologies, topologyId) val monitor = WebExperimentMonitor() - val tracer = EventTracer(clock) testScope.launch { val scheduler = createComputeService( this, clock, environment, - allocationPolicy, - tracer + allocationPolicy ) val failureDomain = if (operational.getBoolean("failuresEnabled")) { diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt index 2161f5f2..91b22266 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt @@ -85,7 +85,7 @@ internal class StageWorkflowSchedulerIntegrationTest { ) } - val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) + val compute = ComputeService(testScope.coroutineContext, clock, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) hosts.forEach { compute.addHost(it) } -- cgit v1.2.3 From 608ff59b2d7e8ce696fe6f7271d80b5efc9c4b87 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 25 Mar 2021 21:50:45 +0100 Subject: compute: Integrate OpenTelemetry Metrics in OpenDC Compute This change integrates the OpenTelemetry Metrics API in the OpenDC Compute Service implementation. This replaces the old infrastructure for gathering metrics. --- simulator/buildSrc/src/main/kotlin/Versions.kt | 5 + simulator/gradle.properties | 6 + .../opendc-compute-service/build.gradle.kts | 1 + .../org/opendc/compute/service/ComputeService.kt | 9 +- .../opendc/compute/service/ComputeServiceEvent.kt | 47 ------ .../compute/service/internal/ComputeServiceImpl.kt | 174 ++++++++++----------- .../opendc/compute/service/ComputeServiceTest.kt | 4 +- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 2 +- .../opendc-experiments-capelin/build.gradle.kts | 2 + .../experiments/capelin/ExperimentHelpers.kt | 131 ++++++++++++---- .../org/opendc/experiments/capelin/Portfolio.kt | 59 +++---- .../capelin/monitor/ExperimentMonitor.kt | 12 +- .../capelin/monitor/ParquetExperimentMonitor.kt | 26 +-- .../experiments/capelin/CapelinIntegrationTest.kt | 129 ++++++--------- .../sc18/UnderspecificationExperiment.kt | 2 + simulator/opendc-runner-web/build.gradle.kts | 2 + .../src/main/kotlin/org/opendc/runner/web/Main.kt | 161 +++++++++---------- .../org/opendc/runner/web/WebExperimentMonitor.kt | 22 ++- .../opendc/simulator/compute/SimAbstractMachine.kt | 7 +- .../simulator/compute/SimBareMetalMachine.kt | 1 + simulator/opendc-telemetry/build.gradle.kts | 21 +++ .../opendc-telemetry-api/build.gradle.kts | 34 ++++ .../opendc-telemetry-sdk/build.gradle.kts | 38 +++++ .../org/opendc/telemetry/sdk/OtelClockAdapter.kt | 39 +++++ .../sdk/metrics/export/CoroutineMetricReader.kt | 99 ++++++++++++ .../StageWorkflowSchedulerIntegrationTest.kt | 4 +- simulator/settings.gradle.kts | 2 + 27 files changed, 646 insertions(+), 393 deletions(-) delete mode 100644 simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt create mode 100644 simulator/opendc-telemetry/build.gradle.kts create mode 100644 simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts create mode 100644 simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts create mode 100644 simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt create mode 100644 simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt (limited to 'simulator') diff --git a/simulator/buildSrc/src/main/kotlin/Versions.kt b/simulator/buildSrc/src/main/kotlin/Versions.kt index d1df6284..6aa9260b 100644 --- a/simulator/buildSrc/src/main/kotlin/Versions.kt +++ b/simulator/buildSrc/src/main/kotlin/Versions.kt @@ -49,6 +49,11 @@ public class Versions(private val project: Project) { val kotlinxCoroutines by version(name = "kotlinx-coroutines") + val otelApi by version(name = "opentelemetry-api") + val otelApiMetrics by version(name = "opentelemetry-api-metrics") + val otelSdk by version(name = "opentelemetry-sdk") + val otelSdkMetrics by version(name = "opentelemetry-sdk-metrics") + /** * Obtain the version for the specified [dependency][name]. diff --git a/simulator/gradle.properties b/simulator/gradle.properties index 8d41408c..99b08bb2 100644 --- a/simulator/gradle.properties +++ b/simulator/gradle.properties @@ -28,6 +28,12 @@ kotlin-logging.version = 2.0.6 slf4j.version = 1.7.30 log4j.version = 2.14.1 +# Dependencies (Telemetry) +opentelemetry-api.version = 1.0.1 +opentelemetry-api-metrics.version = 1.0.1-alpha +opentelemetry-sdk.version = 1.0.1 +opentelemetry-sdk-metrics.version = 1.0.1-alpha + # Dependencies (CLI) clikt.version = 3.1.0 progressbar.version = 0.9.0 diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts index 41b506b2..909e2dcd 100644 --- a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts @@ -32,6 +32,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-compute:opendc-compute-api")) + api(project(":opendc-telemetry:opendc-telemetry-api")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 28cef83a..4bc0ba78 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -22,6 +22,7 @@ package org.opendc.compute.service +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host @@ -34,11 +35,6 @@ import kotlin.coroutines.CoroutineContext * The [ComputeService] hosts the API implementation of the OpenDC Compute service. */ public interface ComputeService : AutoCloseable { - /** - * The events emitted by the service. - */ - public val events: Flow - /** * The hosts that are used by the compute service. */ @@ -80,10 +76,11 @@ public interface ComputeService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, + meter: Meter, allocationPolicy: AllocationPolicy, schedulingQuantum: Long = 300000, ): ComputeService { - return ComputeServiceImpl(context, clock, allocationPolicy, schedulingQuantum) + return ComputeServiceImpl(context, clock, meter, allocationPolicy, schedulingQuantum) } } } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt deleted file mode 100644 index 193008a7..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service - -/** - * An event that is emitted by the [ComputeService]. - */ -public sealed class ComputeServiceEvent { - /** - * The service that has emitted the event. - */ - public abstract val provisioner: ComputeService - - /** - * An event emitted for writing metrics. - */ - public data class MetricsAvailable( - override val provisioner: ComputeService, - public val totalHostCount: Int, - public val availableHostCount: Int, - public val totalVmCount: Int, - public val activeVmCount: Int, - public val inactiveVmCount: Int, - public val waitingVmCount: Int, - public val failedVmCount: Int - ) : ComputeServiceEvent() -} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index f9bd7fbc..26a34ad9 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,18 +22,16 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.utils.TimerScheduler -import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* import kotlin.coroutines.CoroutineContext @@ -48,6 +46,7 @@ import kotlin.math.max internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, + private val meter: Meter, private val allocationPolicy: AllocationPolicy, private val schedulingQuantum: Long ) : ComputeService, HostListener { @@ -101,24 +100,70 @@ internal class ComputeServiceImpl( */ private val servers = mutableMapOf() - private var submittedVms: Int = 0 - private var queuedVms: Int = 0 - private var runningVms: Int = 0 - private var finishedVms: Int = 0 - private var unscheduledVms: Int = 0 - private var maxCores = 0 private var maxMemory = 0L + /** + * The number of servers that have been submitted to the service for provisioning. + */ + private val _submittedServers = meter.longCounterBuilder("servers.submitted") + .setDescription("Number of start requests") + .setUnit("1") + .build() + + /** + * The number of servers that failed to be scheduled. + */ + private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled") + .setDescription("Number of unscheduled servers") + .setUnit("1") + .build() + + /** + * The number of servers that are waiting to be provisioned. + */ + private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting") + .setDescription("Number of servers waiting to be provisioned") + .setUnit("1") + .build() + + /** + * The number of servers that are waiting to be provisioned. + */ + private val _runningServers = meter.longUpDownCounterBuilder("servers.active") + .setDescription("Number of servers currently running") + .setUnit("1") + .build() + + /** + * The number of servers that have finished running. + */ + private val _finishedServers = meter.longCounterBuilder("servers.finished") + .setDescription("Number of servers that finished running") + .setUnit("1") + .build() + + /** + * The number of hosts registered at the compute service. + */ + private val _hostCount = meter.longUpDownCounterBuilder("hosts.total") + .setDescription("Number of hosts") + .setUnit("1") + .build() + + /** + * The number of available hosts registered at the compute service. + */ + private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available") + .setDescription("Number of available hosts") + .setUnit("1") + .build() + /** * The allocation logic to use. */ private val allocationLogic = allocationPolicy() - override val events: Flow - get() = _events - private val _events = EventFlow() - /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ @@ -204,18 +249,6 @@ internal class ComputeServiceImpl( start: Boolean ): Server { check(!isClosed) { "Client is closed" } - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms - ) - ) val uid = UUID(clock.millis(), random.nextLong()) val server = InternalServer( @@ -269,14 +302,23 @@ internal class ComputeServiceImpl( hostToView[host] = hv if (host.state == HostState.UP) { + _availableHostCount.add(1) availableHosts += hv } + _hostCount.add(1) host.addListener(this) } override fun removeHost(host: Host) { - host.removeListener(this) + val view = hostToView.remove(host) + if (view != null) { + if (availableHosts.remove(view)) { + _availableHostCount.add(-1) + } + host.removeListener(this) + _hostCount.add(-1) + } } override fun close() { @@ -288,6 +330,8 @@ internal class ComputeServiceImpl( val request = SchedulingRequest(server) queue.add(request) + _submittedServers.add(1) + _waitingServers.add(1) requestSchedulingCycle() return request } @@ -332,6 +376,7 @@ internal class ComputeServiceImpl( if (request.isCancelled) { queue.poll() + _waitingServers.add(-1) continue } @@ -341,21 +386,10 @@ internal class ComputeServiceImpl( logger.trace { "Server $server selected for scheduling but no capacity available for it at the moment" } if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - runningVms, - finishedVms, - --queuedVms, - ++unscheduledVms - ) - ) - // Remove the incoming image queue.poll() + _waitingServers.add(-1) + _unscheduledServers.add(1) logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") @@ -370,6 +404,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() + _waitingServers.add(-1) logger.info { "Assigned server $server to host $host." } @@ -384,19 +419,6 @@ internal class ComputeServiceImpl( server.host = host host.spawn(server) activeServers[server] = host - - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - ) - ) } catch (e: Throwable) { logger.error("Failed to deploy VM", e) @@ -427,21 +449,9 @@ internal class ComputeServiceImpl( if (hv != null) { // Corner case for when the hypervisor already exists availableHosts += hv + _availableHostCount.add(1) } - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) - // Re-schedule on the new machine requestSchedulingCycle() } @@ -450,19 +460,7 @@ internal class ComputeServiceImpl( val hv = hostToView[host] ?: return availableHosts -= hv - - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) + _availableHostCount.add(-1) requestSchedulingCycle() } @@ -480,23 +478,15 @@ internal class ComputeServiceImpl( server.state = newState - if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { + if (newState == ServerState.RUNNING) { + _runningServers.add(1) + } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - ) - ) - activeServers -= server + _runningServers.add(-1) + _finishedServers.add(1) + val hv = hostToView[host] if (hv != null) { hv.provisionedCores -= server.flavor.cpuCount diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index e1482152..45a306aa 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -23,6 +23,7 @@ package org.opendc.compute.service import io.mockk.* +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.test.TestCoroutineScope @@ -55,7 +56,8 @@ internal class ComputeServiceTest { scope = TestCoroutineScope() val clock = DelayControllerClockAdapter(scope) val policy = AvailableMemoryAllocationPolicy() - service = ComputeService(scope.coroutineContext, clock, policy) + val meter = MeterProvider.noop().get("opendc-compute") + service = ComputeService(scope.coroutineContext, clock, meter, policy) } @AfterEach diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 2e4191cc..89784803 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -59,7 +59,7 @@ public class SimHost( /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. */ - override val scope: CoroutineScope = CoroutineScope(context) + override val scope: CoroutineScope = CoroutineScope(context + Job()) /** * The logger instance of this server. diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 2d0da1bf..b2d7cc30 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -47,4 +47,6 @@ dependencies { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } + + implementation(project(":opendc-telemetry:opendc-telemetry-sdk")) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 6f99a44e..4f48bba7 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -22,6 +22,11 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.launchIn @@ -29,7 +34,6 @@ import kotlinx.coroutines.flow.onEach import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostEvent import org.opendc.compute.service.driver.HostListener @@ -45,8 +49,10 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Clock +import kotlin.coroutines.coroutineContext import kotlin.coroutines.resume import kotlin.math.ln import kotlin.math.max @@ -130,12 +136,13 @@ public fun createTraceReader( /** * Construct the environment for a simulated compute service.. */ -public fun createComputeService( - coroutineScope: CoroutineScope, +public suspend fun withComputeService( clock: Clock, + meter: Meter, environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy -): ComputeService { + allocationPolicy: AllocationPolicy, + block: suspend CoroutineScope.(ComputeService) -> Unit +): Unit = coroutineScope { val hosts = environmentReader .use { it.read() } .map { def -> @@ -144,7 +151,7 @@ public fun createComputeService( def.name, def.model, def.meta, - coroutineScope.coroutineContext, + coroutineContext, clock, SimFairShareHypervisorProvider(), def.powerModel @@ -152,26 +159,33 @@ public fun createComputeService( } val scheduler = - ComputeService(coroutineScope.coroutineContext, clock, allocationPolicy) + ComputeService(coroutineContext, clock, meter, allocationPolicy) for (host in hosts) { scheduler.addHost(host) } - return scheduler + try { + block(this, scheduler) + } finally { + scheduler.close() + hosts.forEach(SimHost::close) + } } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public fun attachMonitor( - coroutineScope: CoroutineScope, +public suspend fun withMonitor( + monitor: ExperimentMonitor, clock: Clock, + metricProducer: MetricProducer, scheduler: ComputeService, - monitor: ExperimentMonitor -): MonitorResults { - val results = MonitorResults() + block: suspend CoroutineScope.() -> Unit +): Unit = coroutineScope { + val monitorJobs = mutableSetOf() + // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -181,7 +195,7 @@ public fun attachMonitor( } }) - host.events + monitorJobs += host.events .onEach { event -> when (event) { is HostEvent.SliceFinished -> monitor.reportHostSlice( @@ -197,37 +211,81 @@ public fun attachMonitor( ) } } - .launchIn(coroutineScope) + .launchIn(this) - (host as SimHost).machine.powerDraw + monitorJobs += (host as SimHost).machine.powerDraw .onEach { monitor.reportPowerConsumption(host, it) } - .launchIn(coroutineScope) + .launchIn(this) } - scheduler.events - .onEach { event -> - when (event) { - is ComputeServiceEvent.MetricsAvailable -> { - results.submittedVms = event.totalVmCount - results.queuedVms = event.waitingVmCount - results.runningVms = event.activeVmCount - results.finishedVms = event.inactiveVmCount - results.unscheduledVms = event.failedVmCount - monitor.reportProvisionerMetrics(clock.millis(), event) - } + val reader = CoroutineMetricReader( + this, listOf(metricProducer), + object : MetricExporter { + override fun export(metrics: Collection): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + + val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + + monitor.reportProvisionerMetrics( + clock.millis(), + hosts, + availableHosts, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + return CompletableResultCode.ofSuccess() } - } - .launchIn(coroutineScope) - return results + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + }, + exportInterval = 5 * 60 * 1000 + ) + + try { + block(this) + } finally { + monitorJobs.forEach(Job::cancel) + reader.close() + monitor.close() + } } -public class MonitorResults { +public class ComputeMetrics { public var submittedVms: Int = 0 public var queuedVms: Int = 0 public var runningVms: Int = 0 - public var finishedVms: Int = 0 public var unscheduledVms: Int = 0 + public var finishedVms: Int = 0 +} + +/** + * Collect the metrics of the compute service. + */ +public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { + val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + val res = ComputeMetrics() + try { + // Hack to extract metrics from OpenTelemetry SDK + res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + } catch (cause: Throwable) { + logger.warn(cause) { "Failed to collect metrics" } + } + return res } /** @@ -242,12 +300,17 @@ public suspend fun processTrace( ) { val client = scheduler.newClient() val image = client.newImage("vm-image") + var offset = Long.MIN_VALUE try { coroutineScope { while (reader.hasNext()) { val entry = reader.next() - delay(max(0, entry.start - clock.millis())) + if (offset < 0) { + offset = entry.start - clock.millis() + } + + delay(max(0, (entry.start - offset) - clock.millis())) launch { chan.send(Unit) val server = client.newServer( diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 46e0bcb9..2921daba 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -22,11 +22,13 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.opendc.compute.service.scheduler.* import org.opendc.experiments.capelin.model.CompositeWorkload @@ -41,6 +43,7 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock import java.io.File import java.util.concurrent.ConcurrentHashMap import kotlin.random.Random @@ -110,15 +113,21 @@ public abstract class Portfolio(name: String) : Experiment(name) { * Perform a single trial for this portfolio. */ @OptIn(ExperimentalCoroutinesApi::class) - override fun doRun(repeat: Int) { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) + override fun doRun(repeat: Int): Unit = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seeder = Random(repeat) val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) val chan = Channel(Channel.CONFLATED) val allocationPolicy = createAllocationPolicy(seeder) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val meter = meterProvider.get("opendc-compute") + val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } @@ -144,14 +153,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { 4096 ) - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environment, - allocationPolicy - ) - + withComputeService(clock, meter, environment, allocationPolicy) { scheduler -> val failureDomain = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( @@ -166,30 +168,21 @@ public abstract class Portfolio(name: String) : Experiment(name) { null } - val monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - trace, - scheduler, - chan, - monitor - ) - - logger.debug("SUBMIT=${monitorResults.submittedVms}") - logger.debug("FAIL=${monitorResults.unscheduledVms}") - logger.debug("QUEUED=${monitorResults.queuedVms}") - logger.debug("RUNNING=${monitorResults.runningVms}") - logger.debug("FINISHED=${monitorResults.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + trace, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() } - try { - testScope.advanceUntilIdle() - } finally { - monitor.close() - } + val monitorResults = collectMetrics(meterProvider as MetricProducer) + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } } /** diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 14cc06dc..a57c8d78 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import java.io.Closeable @@ -68,5 +67,14 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked for a provisioner event. */ - public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {} + public fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index c9d57a98..0e675d87 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -25,7 +25,6 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.telemetry.HostEvent @@ -172,17 +171,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } } - override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { + override fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) { provisionerWriter.write( ProvisionerEvent( time, - event.totalHostCount, - event.availableHostCount, - event.totalVmCount, - event.activeVmCount, - event.inactiveVmCount, - event.waitingVmCount, - event.failedVmCount + totalHostCount, + availableHostCount, + totalVmCount, + activeVmCount, + inactiveVmCount, + waitingVmCount, + failedVmCount ) ) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index a836b334..fd906f4d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,18 +22,18 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.yield -import org.junit.jupiter.api.AfterEach +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.model.Workload @@ -45,24 +45,14 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock import java.io.File -import java.time.Clock /** * An integration test suite for the SC20 experiments. */ @OptIn(ExperimentalCoroutinesApi::class) class CapelinIntegrationTest { - /** - * The [TestCoroutineScope] to use. - */ - private lateinit var testScope: TestCoroutineScope - - /** - * The simulation clock to use. - */ - private lateinit var clock: Clock - /** * The monitor used to keep track of the metrics. */ @@ -73,37 +63,28 @@ class CapelinIntegrationTest { */ @BeforeEach fun setUp() { - testScope = TestCoroutineScope() - clock = DelayControllerClockAdapter(testScope) - monitor = TestExperimentReporter() } - /** - * Tear down the experimental environment. - */ - @AfterEach - fun tearDown() = testScope.cleanupTestCoroutines() - @Test - fun testLarge() { + fun testLarge() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val failures = false val seed = 0 val chan = Channel(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: ComputeService - lateinit var monitorResults: MonitorResults + lateinit var monitorResults: ComputeMetrics - testScope.launch { - scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy - ) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val meter: Meter = meterProvider.get("opendc-compute") + + withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( @@ -118,28 +99,28 @@ class CapelinIntegrationTest { null } - monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - traceReader, - scheduler, - chan, - monitor - ) - - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() - monitor.close() } - runSimulation() + monitorResults = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") // Note that these values have been verified beforehand assertAll( { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(50, monitorResults.finishedVms, "All VMs should finish after a run") }, + { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, + { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, + { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, @@ -148,38 +129,35 @@ class CapelinIntegrationTest { } @Test - fun testSmall() { + fun testSmall() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seed = 1 val chan = Channel(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy - ) - val monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - traceReader, - scheduler, - chan, - monitor - ) - - yield() - - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") - - scheduler.close() - monitor.close() + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val meter: Meter = meterProvider.get("opendc-compute") + + withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } } - runSimulation() + val metrics = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") // Note that these values have been verified beforehand assertAll( @@ -190,11 +168,6 @@ class CapelinIntegrationTest { ) } - /** - * Run the simulation. - */ - private fun runSimulation() = testScope.advanceUntilIdle() - /** * Obtain the trace reader for the test. */ diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 98e25be9..225200c9 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.sc18 +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.test.TestCoroutineScope import org.opendc.compute.service.ComputeService @@ -100,6 +101,7 @@ public class UnderspecificationExperiment : Experiment("underspecification") { val compute = ComputeService( testScope.coroutineContext, clock, + MeterProvider.noop().get("opendc-compute"), NumberOfActiveServersAllocationPolicy(), ) diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts index d07fe7a6..fcc78a83 100644 --- a/simulator/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc-runner-web/build.gradle.kts @@ -48,4 +48,6 @@ dependencies { runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}") runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}") + + implementation(project(":opendc-telemetry:opendc-telemetry-sdk")) } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 68ea3fb9..706efdc9 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -34,9 +34,13 @@ import com.mongodb.client.MongoClients import com.mongodb.client.MongoCollection import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.bson.Document import org.bson.types.ObjectId @@ -45,17 +49,14 @@ import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy import org.opendc.compute.service.scheduler.RandomAllocationPolicy -import org.opendc.experiments.capelin.attachMonitor -import org.opendc.experiments.capelin.createComputeService -import org.opendc.experiments.capelin.createFailureDomain +import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.processTrace import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock import java.io.File -import kotlin.coroutines.coroutineContext import kotlin.random.Random private val logger = KotlinLogging.logger {} @@ -206,86 +207,86 @@ public class RunnerCli : CliktCommand(name = "runner") { traceReader: Sc20RawParquetTraceReader, performanceInterferenceReader: Sc20PerformanceInterferenceReader? ): WebExperimentMonitor.Result { - val seed = repeat - val traceDocument = scenario.get("trace", Document::class.java) - val workloadName = traceDocument.getString("traceId") - val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() - - val seeder = Random(seed) - val testScope = TestCoroutineScope(Job(parent = coroutineContext[Job])) - val clock = DelayControllerClockAdapter(testScope) - - val chan = Channel(Channel.CONFLATED) - - val operational = scenario.get("operational", Document::class.java) - val allocationPolicy = - when (val policyName = operational.getString("schedulerName")) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - else -> throw IllegalArgumentException("Unknown policy $policyName") - } - - val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader( - listOf(traceReader), - performanceInterferenceModel, - Workload(workloadName, workloadFraction), - seed - ) - val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) - val environment = TopologyParser(topologies, topologyId) val monitor = WebExperimentMonitor() - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environment, - allocationPolicy - ) - - val failureDomain = if (operational.getBoolean("failuresEnabled")) { - logger.debug("ENABLING failures") - createFailureDomain( - testScope, - clock, - seeder.nextInt(), - operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, - scheduler, - chan - ) - } else { - null - } + try { + runBlockingTest { + val seed = repeat + val traceDocument = scenario.get("trace", Document::class.java) + val workloadName = traceDocument.getString("traceId") + val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() + + val seeder = Random(seed) + val clock = DelayControllerClockAdapter(this) + + val chan = Channel(Channel.CONFLATED) + + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val metricProducer = meterProvider as MetricProducer + val meter: Meter = meterProvider.get("opendc-compute") + + val operational = scenario.get("operational", Document::class.java) + val allocationPolicy = + when (val policyName = operational.getString("schedulerName")) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + else -> throw IllegalArgumentException("Unknown policy $policyName") + } - val monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - trace, - scheduler, - chan, - monitor - ) + val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader( + listOf(traceReader), + performanceInterferenceModel, + Workload(workloadName, workloadFraction), + seed + ) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) + val environment = TopologyParser(topologies, topologyId) + val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7 + + withComputeService(clock, meter, environment, allocationPolicy) { scheduler -> + val failureDomain = if (failureFrequency > 0) { + logger.debug { "ENABLING failures" } + createFailureDomain( + this, + clock, + seeder.nextInt(), + failureFrequency, + scheduler, + chan + ) + } else { + null + } - logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}" } + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + trace, + scheduler, + chan, + monitor + ) + } - failureDomain?.cancel() - scheduler.close() - } + failureDomain?.cancel() + } - try { - testScope.advanceUntilIdle() - testScope.uncaughtExceptions.forEach { it.printStackTrace() } - } finally { - monitor.close() - testScope.cancel() + val monitorResults = collectMetrics(metricProducer) + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } + } + } catch (cause: Throwable) { + logger.warn(cause) { "Experiment failed" } } return monitor.getResult() diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index a8ac6c10..fcd43ea7 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -25,7 +25,6 @@ package org.opendc.runner.web import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -205,13 +204,22 @@ public class WebExperimentMonitor : ExperimentMonitor { private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() - override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { + override fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) { provisionerMetrics = AggregateProvisionerMetrics( - max(event.totalVmCount, provisionerMetrics.vmTotalCount), - max(event.waitingVmCount, provisionerMetrics.vmWaitingCount), - max(event.activeVmCount, provisionerMetrics.vmActiveCount), - max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount), - max(event.failedVmCount, provisionerMetrics.vmFailedCount), + max(totalVmCount, provisionerMetrics.vmTotalCount), + max(waitingVmCount, provisionerMetrics.vmWaitingCount), + max(activeVmCount, provisionerMetrics.vmActiveCount), + max(inactiveVmCount, provisionerMetrics.vmInactiveCount), + max(failedVmCount, provisionerMetrics.vmFailedCount), ) } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 1c0f94fd..2127b066 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -108,8 +108,11 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine .launchIn(this) launch { - source.consume(consumer) - job.cancel() + try { + source.consume(consumer) + } finally { + job.cancel() + } } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index f86c4198..830ff70e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -71,6 +71,7 @@ public class SimBareMetalMachine( override fun close() { super.close() + scheduler.close() scope.cancel() } diff --git a/simulator/opendc-telemetry/build.gradle.kts b/simulator/opendc-telemetry/build.gradle.kts new file mode 100644 index 00000000..7edfd134 --- /dev/null +++ b/simulator/opendc-telemetry/build.gradle.kts @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ diff --git a/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts b/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts new file mode 100644 index 00000000..d9a4b4dd --- /dev/null +++ b/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Telemetry API for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api("io.opentelemetry:opentelemetry-api:${versions.otelApi}") + api("io.opentelemetry:opentelemetry-api-metrics:${versions.otelApiMetrics}") +} diff --git a/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts b/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts new file mode 100644 index 00000000..350a0f74 --- /dev/null +++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Telemetry SDK for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api(project(":opendc-telemetry:opendc-telemetry-api")) + api("org.jetbrains.kotlinx:kotlinx-coroutines-core") + api("io.opentelemetry:opentelemetry-sdk:${versions.otelSdk}") + api("io.opentelemetry:opentelemetry-sdk-metrics:${versions.otelSdkMetrics}") + + implementation("io.github.microutils:kotlin-logging") +} diff --git a/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt new file mode 100644 index 00000000..86f6647e --- /dev/null +++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.sdk + +import io.opentelemetry.sdk.common.Clock + +/** + * An adapter class that bridges a [java.time.Clock] to a [Clock] + */ +public class OtelClockAdapter(private val clock: java.time.Clock) : Clock { + override fun now(): Long = clock.millis() + + override fun nanoTime(): Long = clock.millis() * 1_000_000L +} + +/** + * Convert the specified [java.time.Clock] to a [Clock]. + */ +public fun java.time.Clock.toOtelClock(): Clock = OtelClockAdapter(this) diff --git a/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt new file mode 100644 index 00000000..9ee16fac --- /dev/null +++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.sdk.metrics.export + +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import mu.KotlinLogging +import java.util.* +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +/** + * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every + * export interval. + * + * The reader runs in a [CoroutineScope] which enables collection of metrics in environments with a custom clock. + * + * @param scope The [CoroutineScope] to run the reader in. + * @param producers The metric producers to gather metrics from. + * @param exporter The export to export the metrics to. + * @param exportInterval The export interval in milliseconds. + */ +public class CoroutineMetricReader( + scope: CoroutineScope, + private val producers: List, + private val exporter: MetricExporter, + private val exportInterval: Long = 60_000 +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + private val chan = Channel>(Channel.RENDEZVOUS) + + /** + * The metric reader job. + */ + private val readerJob = scope.launch { + while (isActive) { + delay(exportInterval) + + val metrics = mutableListOf() + for (producer in producers) { + metrics.addAll(producer.collectAllMetrics()) + } + chan.send(Collections.unmodifiableList(metrics)) + } + } + + /** + * The exporter job runs in the background to actually export the metrics. + */ + private val exporterJob = chan.consumeAsFlow() + .onEach { metrics -> + suspendCoroutine { cont -> + try { + val result = exporter.export(metrics) + result.whenComplete { + if (!result.isSuccess) { + logger.trace { "Exporter failed" } + } + cont.resume(Unit) + } + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } + cont.resume(Unit) + } + } + } + .launchIn(scope) + + override fun close() { + readerJob.cancel() + exporterJob.cancel() + } +} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt index 91b22266..5e276edf 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt @@ -22,6 +22,7 @@ package org.opendc.workflow.service +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect @@ -85,7 +86,8 @@ internal class StageWorkflowSchedulerIntegrationTest { ) } - val compute = ComputeService(testScope.coroutineContext, clock, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) + val meter = MeterProvider.noop().get("opendc-compute") + val compute = ComputeService(testScope.coroutineContext, clock, meter, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) hosts.forEach { compute.addHost(it) } diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index d5603664..73b4d8e7 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -39,5 +39,7 @@ include(":opendc-simulator:opendc-simulator-resources") include(":opendc-simulator:opendc-simulator-compute") include(":opendc-simulator:opendc-simulator-failures") include(":opendc-trace:opendc-trace-core") +include(":opendc-telemetry:opendc-telemetry-api") +include(":opendc-telemetry:opendc-telemetry-sdk") include(":opendc-harness") include(":opendc-utils") -- cgit v1.2.3 From b8ba3cf81da6367285c5d5a23a70f8c340a45fdd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 26 Mar 2021 15:30:22 +0100 Subject: compute: Integrate OpenTelemetry Metrics in OpenDC Workflow This change integrates the OpenTelemetry Metrics API in the OpenDC Workflow Service implementation. This replaces the old infrastructure for gathering metrics. --- .../opendc-experiments-sc18/.gitignore | 2 - .../opendc-experiments-sc18/build.gradle.kts | 38 ----- .../sc18/UnderspecificationExperiment.kt | 138 ------------------ .../org/opendc/experiments/sc18/WorkflowMetrics.kt | 86 ----------- .../opendc-workflow-service/build.gradle.kts | 2 + .../org/opendc/workflow/service/WorkflowService.kt | 12 +- .../service/internal/WorkflowServiceImpl.kt | 109 +++++++++++--- .../StageWorkflowSchedulerIntegrationTest.kt | 141 ------------------ .../service/WorkflowServiceIntegrationTest.kt | 161 +++++++++++++++++++++ .../src/test/resources/log4j2.xml | 3 + simulator/settings.gradle.kts | 1 - 11 files changed, 261 insertions(+), 432 deletions(-) delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc18/.gitignore delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt delete mode 100644 simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt delete mode 100644 simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt create mode 100644 simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt (limited to 'simulator') diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore b/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore deleted file mode 100644 index ba64707c..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -input/ -output/ diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts deleted file mode 100644 index 02e77c7c..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2019 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Experiments for the SC18 article" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` - `experiment-conventions` -} - -dependencies { - api(platform(project(":opendc-platform"))) - api(project(":opendc-harness")) - implementation(project(":opendc-format")) - implementation(project(":opendc-workflow:opendc-workflow-service")) - implementation(project(":opendc-simulator:opendc-simulator-core")) - implementation(project(":opendc-compute:opendc-compute-simulator")) -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt deleted file mode 100644 index 225200c9..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc18 - -import io.opentelemetry.api.metrics.MeterProvider -import kotlinx.coroutines.* -import kotlinx.coroutines.test.TestCoroutineScope -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.simulator.SimHost -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.harness.dsl.Experiment -import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer -import org.opendc.trace.core.enable -import org.opendc.workflow.service.WorkflowEvent -import org.opendc.workflow.service.WorkflowService -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode -import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy -import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy -import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import java.io.File -import java.io.FileInputStream -import kotlin.math.max - -/** - * The [UnderspecificationExperiment] investigates the impact of scheduler underspecification on performance. - * It focuses on components that must exist (that is, based on their own publications, the correct operation of the - * schedulers under study requires these components), yet have been left underspecified by their author. - */ -public class UnderspecificationExperiment : Experiment("underspecification") { - /** - * The workflow traces to test. - */ - private val trace: String by anyOf("input/traces/chronos_exp_noscaler_ca.gwf") - - /** - * The datacenter environments to test. - */ - private val environment: String by anyOf("input/environments/base.json") - - @OptIn(ExperimentalCoroutinesApi::class) - override fun doRun(repeat: Int) { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) - val recording = tracer.openRecording().run { - enable() - enable() - enable() - enable() - enable() - this - } - - testScope.launch { - launch { println("MAKESPAN: ${recording.workflowRuntime()}") } - launch { println("WAIT: ${recording.workflowWaitingTime()}") } - recording.start() - } - - testScope.launch { - val hosts = Sc18EnvironmentReader(FileInputStream(File(environment))) - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - testScope.coroutineContext, - clock, - SimSpaceSharedHypervisorProvider() - ) - } - - val compute = ComputeService( - testScope.coroutineContext, - clock, - MeterProvider.noop().get("opendc-compute"), - NumberOfActiveServersAllocationPolicy(), - ) - - hosts.forEach { compute.addHost(it) } - - val scheduler = WorkflowService( - testScope.coroutineContext, - clock, - tracer, - compute.newClient(), - mode = WorkflowSchedulerMode.Batch(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - - val reader = GwfTraceReader(File(trace)) - - while (reader.hasNext()) { - val entry = reader.next() - delay(max(0, entry.start * 1000 - clock.millis())) - scheduler.submit(entry.workload) - } - } - - testScope.advanceUntilIdle() - recording.close() - - // Check whether everything went okay - testScope.uncaughtExceptions.forEach { it.printStackTrace() } - assert(testScope.uncaughtExceptions.isEmpty()) { "Errors occurred during execution of the experiment" } - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt deleted file mode 100644 index a8356888..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc18 - -import org.opendc.trace.core.EventStream -import org.opendc.trace.core.onEvent -import org.opendc.workflow.service.WorkflowEvent -import java.util.* -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine - -/** - * This function collects the makespan of workflows that appear in the event stream. - */ -public suspend fun EventStream.workflowRuntime(): Map = suspendCoroutine { cont -> - val starts = mutableMapOf() - val results = mutableMapOf() - - onEvent { - starts[it.job.uid] = it.timestamp - } - onEvent { - val start = starts.remove(it.job.uid) ?: return@onEvent - results[it.job.uid] = it.timestamp - start - } - onClose { cont.resume(results) } -} - -/** - * This function collects the waiting time of workflows that appear in the event stream, which the duration between the - * workflow submission and the start of the first task. - */ -public suspend fun EventStream.workflowWaitingTime(): Map = suspendCoroutine { cont -> - val starts = mutableMapOf() - val results = mutableMapOf() - - onEvent { - starts[it.job.uid] = it.timestamp - } - onEvent { - results.computeIfAbsent(it.job.uid) { _ -> - val start = starts.remove(it.job.uid)!! - it.timestamp - start - } - } - onClose { cont.resume(results) } -} - -/** - * This function collects the response time of tasks that appear in the event stream. - */ -public suspend fun EventStream.taskResponse(): Map = suspendCoroutine { cont -> - val starts = mutableMapOf() - val results = mutableMapOf() - - onEvent { - for (task in it.job.tasks) { - starts[task.uid] = it.timestamp - } - } - onEvent { - val start = starts.remove(it.job.uid) ?: return@onEvent - results[it.task.uid] = it.timestamp - start - } - onClose { cont.resume(results) } -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts index bec18ae9..5625c3d8 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -34,12 +34,14 @@ dependencies { api(project(":opendc-workflow:opendc-workflow-api")) api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-trace:opendc-trace-core")) + api(project(":opendc-telemetry:opendc-telemetry-api")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) testImplementation(project(":opendc-compute:opendc-compute-simulator")) testImplementation(project(":opendc-format")) + testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk")) testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl") } diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index 2f83e376..94302790 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,6 +22,7 @@ package org.opendc.workflow.service +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.ComputeClient import org.opendc.trace.core.EventTracer @@ -42,14 +43,14 @@ import kotlin.coroutines.CoroutineContext */ public interface WorkflowService : AutoCloseable { /** - * The events emitted by the workflow scheduler. + * Submit the specified [Job] to the workflow service for scheduling. */ - public val events: Flow + public suspend fun submit(job: Job) /** - * Submit the specified [Job] to the workflow service for scheduling. + * Run the specified [Job] and suspend execution until the job is finished. */ - public suspend fun submit(job: Job) + public suspend fun run(job: Job) /** * Terminate the lifecycle of the workflow service, stopping all running workflows. @@ -63,6 +64,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. + * @param meter The meter to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -74,6 +76,7 @@ public interface WorkflowService : AutoCloseable { context: CoroutineContext, clock: Clock, tracer: EventTracer, + meter: Meter, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -85,6 +88,7 @@ public interface WorkflowService : AutoCloseable { context, clock, tracer, + meter, compute, mode, jobAdmissionPolicy, diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 85a88acd..1aef6f8e 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -22,12 +22,10 @@ package org.opendc.workflow.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel +import io.opentelemetry.api.metrics.Meter +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.trace.core.EventTracer @@ -43,7 +41,9 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock import java.util.* +import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for @@ -53,6 +53,7 @@ public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, internal val tracer: EventTracer, + private val meter: Meter, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -63,7 +64,7 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - internal val scope = CoroutineScope(context) + internal val scope = CoroutineScope(context + Job()) /** * The logger instance to use. @@ -105,6 +106,11 @@ public class WorkflowServiceImpl( */ internal val taskByServer = mutableMapOf() + /** + * The continuation of the jobs. + */ + private val conts = mutableMapOf>() + /** * The root listener of this scheduler. */ @@ -151,6 +157,54 @@ public class WorkflowServiceImpl( } } + /** + * The number of jobs that have been submitted to the service. + */ + private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + .setDescription("Number of submitted jobs") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + .setDescription("Number of jobs running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedJobs = meter.longCounterBuilder("jobs.finished") + .setDescription("Number of jobs that finished running") + .setUnit("1") + .build() + + /** + * The number of tasks that have been submitted to the service. + */ + private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + .setDescription("Number of submitted tasks") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + .setDescription("Number of tasks running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedTasks = meter.longCounterBuilder("tasks.finished") + .setDescription("Number of tasks that finished running") + .setUnit("1") + .build() + private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic @@ -167,16 +221,7 @@ public class WorkflowServiceImpl( } } - override val events: Flow = tracer.openRecording().let { - it.enable() - it.enable() - it.enable() - it.enable() - it.enable() - it.consumeAsFlow().map { event -> event as WorkflowEvent } - } - - override suspend fun submit(job: Job) { + override suspend fun run(job: Job) { // J1 Incoming Jobs val jobInstance = JobState(job, clock.millis()) val instances = job.tasks.associateWith { @@ -193,14 +238,25 @@ public class WorkflowServiceImpl( if (instance.isRoot) { instance.state = TaskStatus.READY } + + submittedTasks.add(1) } - instances.values.toCollection(jobInstance.tasks) - incomingJobs += jobInstance - rootListener.jobSubmitted(jobInstance) - tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) + return suspendCancellableCoroutine { cont -> + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + conts[job] = cont + + submittedJobs.add(1) + tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) - requestCycle() + requestCycle() + } + } + + override suspend fun submit(job: Job) { + scope.launch { run(job) } } override fun close() { @@ -231,6 +287,8 @@ public class WorkflowServiceImpl( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance + + runningJobs.add(1) tracer.commit( WorkflowEvent.JobStarted( this, @@ -311,11 +369,11 @@ public class WorkflowServiceImpl( public override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { - ServerState.PROVISIONING -> { - } + ServerState.PROVISIONING -> {} ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() + runningTasks.add(1) tracer.commit( WorkflowEvent.TaskStarted( this@WorkflowServiceImpl, @@ -338,6 +396,9 @@ public class WorkflowServiceImpl( task.finishedAt = clock.millis() job.tasks.remove(task) activeTasks -= task + + runningTasks.add(-1) + finishedTasks.add(1) tracer.commit( WorkflowEvent.TaskFinished( this@WorkflowServiceImpl, @@ -371,8 +432,12 @@ public class WorkflowServiceImpl( private fun finishJob(job: JobState) { activeJobs -= job + runningJobs.add(-1) + finishedJobs.add(1) tracer.commit(WorkflowEvent.JobFinished(this, job.job)) rootListener.jobFinished(job) + + conts.remove(job.job)?.resume(Unit) } public fun addListener(listener: WorkflowSchedulerListener) { diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt deleted file mode 100644 index 5e276edf..00000000 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import io.opentelemetry.api.metrics.MeterProvider -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNotEquals -import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.simulator.SimHost -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer -import org.opendc.workflow.service.internal.WorkflowServiceImpl -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode -import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy -import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy -import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import kotlin.math.max - -/** - * Integration test suite for the [WorkflowServiceImpl]. - */ -@DisplayName("WorkflowServiceImpl") -@OptIn(ExperimentalCoroutinesApi::class) -internal class StageWorkflowSchedulerIntegrationTest { - /** - * A large integration test where we check whether all tasks in some trace are executed correctly. - */ - @Test - fun testTrace() { - var jobsSubmitted = 0L - var jobsStarted = 0L - var jobsFinished = 0L - var tasksStarted = 0L - var tasksFinished = 0L - - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) - - val scheduler = let { - val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - testScope.coroutineContext, - clock, - SimSpaceSharedHypervisorProvider() - ) - } - - val meter = MeterProvider.noop().get("opendc-compute") - val compute = ComputeService(testScope.coroutineContext, clock, meter, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) - - hosts.forEach { compute.addHost(it) } - - WorkflowService( - testScope.coroutineContext, - clock, - tracer, - compute.newClient(), - mode = WorkflowSchedulerMode.Batch(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - } - - testScope.launch { - scheduler.events - .onEach { event -> - when (event) { - is WorkflowEvent.JobStarted -> jobsStarted++ - is WorkflowEvent.JobFinished -> jobsFinished++ - is WorkflowEvent.TaskStarted -> tasksStarted++ - is WorkflowEvent.TaskFinished -> tasksFinished++ - } - } - .collect() - } - - testScope.launch { - val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) - - while (reader.hasNext()) { - val entry = reader.next() - jobsSubmitted++ - delay(max(0, entry.start - clock.millis())) - scheduler.submit(entry.workload) - } - } - - testScope.advanceUntilIdle() - - assertAll( - { assertEquals(emptyList(), testScope.uncaughtExceptions) }, - { assertNotEquals(0, jobsSubmitted, "No jobs submitted") }, - { assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") }, - { assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") }, - { assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") } - ) - } -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt new file mode 100644 index 00000000..28fe76c7 --- /dev/null +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHost +import org.opendc.format.environment.sc18.Sc18EnvironmentReader +import org.opendc.format.trace.gwf.GwfTraceReader +import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.trace.core.EventTracer +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import kotlin.math.max + +/** + * Integration test suite for the [WorkflowServiceImpl]. + */ +@DisplayName("WorkflowServiceImpl") +@OptIn(ExperimentalCoroutinesApi::class) +internal class WorkflowServiceIntegrationTest { + /** + * A large integration test where we check whether all tasks in some trace are executed correctly. + */ + @Test + fun testTrace() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val tracer = EventTracer(clock) + + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) + .use { it.read() } + .map { def -> + SimHost( + def.uid, + def.name, + def.model, + def.meta, + coroutineContext, + clock, + SimSpaceSharedHypervisorProvider() + ) + } + + val meter = MeterProvider.noop().get("opendc-compute") + val compute = ComputeService(coroutineContext, clock, meter, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) + + hosts.forEach { compute.addHost(it) } + + val scheduler = WorkflowService( + coroutineContext, + clock, + tracer, + meterProvider.get("opendc-workflow"), + compute.newClient(), + mode = WorkflowSchedulerMode.Batch(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + ) + + val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) + var offset = Long.MIN_VALUE + + coroutineScope { + while (reader.hasNext()) { + val entry = reader.next() + + if (offset < 0) { + offset = entry.start - clock.millis() + } + + delay(max(0, (entry.start - offset) - clock.millis())) + launch { + scheduler.run(entry.workload) + } + } + } + + hosts.forEach(SimHost::close) + scheduler.close() + compute.close() + + val metrics = collectMetrics(meterProvider as MetricProducer) + + assertAll( + { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, + { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, + { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") } + ) + } + + class WorkflowMetrics { + var jobsSubmitted = 0L + var jobsActive = 0L + var jobsFinished = 0L + var tasksSubmitted = 0L + var tasksActive = 0L + var tasksFinished = 0L + } + + /** + * Collect the metrics of the workflow service. + */ + private fun collectMetrics(metricProducer: MetricProducer): WorkflowMetrics { + val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + val res = WorkflowMetrics() + res.jobsSubmitted = metrics["jobs.submitted"]?.longSumData?.points?.last()?.value ?: 0 + res.jobsActive = metrics["jobs.active"]?.longSumData?.points?.last()?.value ?: 0 + res.jobsFinished = metrics["jobs.finished"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksSubmitted = metrics["tasks.submitted"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksActive = metrics["tasks.active"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksFinished = metrics["tasks.finished"]?.longSumData?.points?.last()?.value ?: 0 + return res + } +} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml index 70a0eacc..edcf41ed 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml @@ -28,6 +28,9 @@ + + + diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index 73b4d8e7..a7a55b1c 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -31,7 +31,6 @@ include(":opendc-serverless:opendc-serverless-api") include(":opendc-serverless:opendc-serverless-service") include(":opendc-serverless:opendc-serverless-simulator") include(":opendc-format") -include(":opendc-experiments:opendc-experiments-sc18") include(":opendc-experiments:opendc-experiments-capelin") include(":opendc-runner-web") include(":opendc-simulator:opendc-simulator-core") -- cgit v1.2.3 From be95e0b10b648e256c4eb8c2190ad6ed61f8afdd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 26 Mar 2021 15:36:44 +0100 Subject: workflow: Remove event tracer from workflow service This change removes the event tracer from the OpenDC Workflow service as we start migrating to the industry standard OpenTelemetry. --- .../org/opendc/compute/service/ComputeService.kt | 1 - .../opendc-workflow-service/build.gradle.kts | 1 - .../org/opendc/workflow/service/WorkflowEvent.kt | 79 ---------------------- .../org/opendc/workflow/service/WorkflowService.kt | 4 -- .../service/internal/WorkflowServiceImpl.kt | 27 -------- .../service/WorkflowServiceIntegrationTest.kt | 3 - 6 files changed, 115 deletions(-) delete mode 100644 simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt (limited to 'simulator') diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 4bc0ba78..98566da3 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -23,7 +23,6 @@ package org.opendc.compute.service import io.opentelemetry.api.metrics.Meter -import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl diff --git a/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts index 5625c3d8..040a9ff6 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -33,7 +33,6 @@ dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-workflow:opendc-workflow-api")) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-trace:opendc-trace-core")) api(project(":opendc-telemetry:opendc-telemetry-api")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt deleted file mode 100644 index bb2ad6c6..00000000 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import org.opendc.trace.core.Event -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task - -/** - * An event emitted by the [WorkflowService]. - */ -public sealed class WorkflowEvent : Event() { - /** - * The [WorkflowService] that emitted the event. - */ - public abstract val service: WorkflowService - - /** - * This event is emitted when a job was submitted to the scheduler. - */ - public data class JobSubmitted( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a job has become active. - */ - public data class JobStarted( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a job has finished processing. - */ - public data class JobFinished( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a task of a job has started processing. - */ - public data class TaskStarted( - override val service: WorkflowService, - public val job: Job, - public val task: Task - ) : WorkflowEvent() - - /** - * This event is emitted when a task of a job has started processing. - */ - public data class TaskFinished( - override val service: WorkflowService, - public val job: Job, - public val task: Task - ) : WorkflowEvent() -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index 94302790..d3358ef1 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -23,9 +23,7 @@ package org.opendc.workflow.service import io.opentelemetry.api.metrics.Meter -import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.ComputeClient -import org.opendc.trace.core.EventTracer import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode @@ -75,7 +73,6 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - tracer: EventTracer, meter: Meter, compute: ComputeClient, mode: WorkflowSchedulerMode, @@ -87,7 +84,6 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - tracer, meter, compute, mode, diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 1aef6f8e..32191b8f 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -24,13 +24,9 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import mu.KotlinLogging import org.opendc.compute.api.* -import org.opendc.trace.core.EventTracer -import org.opendc.trace.core.consumeAsFlow -import org.opendc.trace.core.enable import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* @@ -52,7 +48,6 @@ import kotlin.coroutines.resume public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, - internal val tracer: EventTracer, private val meter: Meter, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, @@ -249,7 +244,6 @@ public class WorkflowServiceImpl( conts[job] = cont submittedJobs.add(1) - tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) requestCycle() } @@ -289,12 +283,6 @@ public class WorkflowServiceImpl( activeJobs += jobInstance runningJobs.add(1) - tracer.commit( - WorkflowEvent.JobStarted( - this, - jobInstance.job - ) - ) rootListener.jobStarted(jobInstance) } @@ -374,13 +362,6 @@ public class WorkflowServiceImpl( val task = taskByServer.getValue(server) task.startedAt = clock.millis() runningTasks.add(1) - tracer.commit( - WorkflowEvent.TaskStarted( - this@WorkflowServiceImpl, - task.job.job, - task.task - ) - ) rootListener.taskStarted(task) } ServerState.TERMINATED, ServerState.ERROR -> { @@ -399,13 +380,6 @@ public class WorkflowServiceImpl( runningTasks.add(-1) finishedTasks.add(1) - tracer.commit( - WorkflowEvent.TaskFinished( - this@WorkflowServiceImpl, - task.job.job, - task.task - ) - ) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -434,7 +408,6 @@ public class WorkflowServiceImpl( activeJobs -= job runningJobs.add(-1) finishedJobs.add(1) - tracer.commit(WorkflowEvent.JobFinished(this, job.job)) rootListener.jobFinished(job) conts.remove(job.job)?.resume(Unit) diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index 28fe76c7..e06e5eb3 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -42,7 +42,6 @@ import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.telemetry.sdk.toOtelClock -import org.opendc.trace.core.EventTracer import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy @@ -63,7 +62,6 @@ internal class WorkflowServiceIntegrationTest { @Test fun testTrace() = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val tracer = EventTracer(clock) val meterProvider: MeterProvider = SdkMeterProvider .builder() @@ -92,7 +90,6 @@ internal class WorkflowServiceIntegrationTest { val scheduler = WorkflowService( coroutineContext, clock, - tracer, meterProvider.get("opendc-workflow"), compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), -- cgit v1.2.3 From b0bdbba1c0d9fa1f90eb2831b53340a77b87d949 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 26 Mar 2021 15:38:35 +0100 Subject: tracer: Remove event tracer from repository This change removes the event tracer from the repository as we migrate to the industry standard OpenTelemetry. --- .../opendc-serverless-service/build.gradle.kts | 1 - simulator/opendc-trace/build.gradle.kts | 21 --- .../opendc-trace-core/build.gradle.kts | 33 ----- .../src/main/kotlin/org/opendc/trace/core/Event.kt | 34 ----- .../kotlin/org/opendc/trace/core/EventStream.kt | 76 ---------- .../kotlin/org/opendc/trace/core/EventTracer.kt | 84 ----------- .../kotlin/org/opendc/trace/core/Extensions.kt | 73 ---------- .../org/opendc/trace/core/RecordingStream.kt | 52 ------- .../trace/core/internal/AbstractEventStream.kt | 139 ------------------ .../org/opendc/trace/core/internal/Dispatcher.kt | 77 ---------- .../opendc/trace/core/internal/EventDispatcher.kt | 44 ------ .../opendc/trace/core/internal/EventTracerImpl.kt | 157 --------------------- .../org/opendc/trace/core/internal/StreamState.kt | 30 ---- simulator/settings.gradle.kts | 1 - 14 files changed, 822 deletions(-) delete mode 100644 simulator/opendc-trace/build.gradle.kts delete mode 100644 simulator/opendc-trace/opendc-trace-core/build.gradle.kts delete mode 100644 simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt delete mode 100644 simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt delete mode 100644 simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt delete mode 100644 simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt delete mode 100644 simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt delete mode 100644 simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt delete mode 100644 simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt delete mode 100644 simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt delete mode 100644 simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt delete mode 100644 simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt (limited to 'simulator') diff --git a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts index bcb08be7..0221829a 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts +++ b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts @@ -32,7 +32,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-serverless:opendc-serverless-api")) - api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") diff --git a/simulator/opendc-trace/build.gradle.kts b/simulator/opendc-trace/build.gradle.kts deleted file mode 100644 index a1a751a2..00000000 --- a/simulator/opendc-trace/build.gradle.kts +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ diff --git a/simulator/opendc-trace/opendc-trace-core/build.gradle.kts b/simulator/opendc-trace/opendc-trace-core/build.gradle.kts deleted file mode 100644 index 3051f733..00000000 --- a/simulator/opendc-trace/opendc-trace-core/build.gradle.kts +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Event tracing library for OpenDC" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(platform(project(":opendc-platform"))) - api("org.jetbrains.kotlinx:kotlinx-coroutines-core") -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt deleted file mode 100644 index 1f4bb267..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core - -/** - * Base class for events reported by the OpenDC tracing library. - */ -public abstract class Event(timestamp: Long = Long.MIN_VALUE) { - /** - * The timestamp at which the event has occurred. - */ - public var timestamp: Long = timestamp - internal set -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt deleted file mode 100644 index ac2b5e9b..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core - -/** - * A stream of [Event]s. - */ -public interface EventStream : AutoCloseable { - /** - * Register the specified [action] to be performed on every event in the stream. - */ - public fun onEvent(action: (Event) -> Unit) - - /** - * Register the specified [action] to be performed on events of type [E]. - */ - public fun onEvent(type: Class, action: (E) -> Unit) - - /** - * Register the specified [action] to be performed on errors. - */ - public fun onError(action: (Throwable) -> Unit) - - /** - * Register the specified [action] to be performed when the stream is closed. - */ - public fun onClose(action: Runnable) - - /** - * Unregister the specified [action]. - * - * @return `true` if an action was unregistered, `false` otherwise. - */ - public fun remove(action: Any): Boolean - - /** - * Start the processing of events in the current coroutine. - * - * @throws IllegalStateException if the stream was already started. - */ - public suspend fun start() - - /** - * Release all resources associated with this stream. - * - * @throws IllegalStateException if the stream was already stopped. - */ - public override fun close() -} - -/** - * Register the specified [action] to be performed on events of type [E]. - */ -public inline fun EventStream.onEvent(noinline action: (E) -> Unit) { - onEvent(E::class.java, action) -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt deleted file mode 100644 index 4f978f4f..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core - -import org.opendc.trace.core.internal.EventTracerImpl -import java.time.Clock - -/** - * An [EventTracer] is responsible for recording the events that occur in a system. - */ -public interface EventTracer : AutoCloseable { - /** - * The [Clock] used to measure the timestamp and duration of the events. - */ - public val clock: Clock - - /** - * Determine whether the specified [Event] class is currently enabled in any of the active recordings. - * - * @return `true` if the event is enabled, `false` otherwise. - */ - public fun isEnabled(type: Class): Boolean - - /** - * Commit the specified [event] to the appropriate event streams. - */ - public fun commit(event: Event) - - /** - * Create a new [RecordingStream] which is able to actively capture events emitted to the [EventTracer]. - */ - public fun openRecording(): RecordingStream - - /** - * Terminate the lifecycle of the [EventTracer] and close its associated event streams. - */ - public override fun close() - - public companion object { - /** - * Construct a new [EventTracer] instance. - * - * @param clock The [Clock] used to measure the timestamps. - */ - @JvmName("create") - public operator fun invoke(clock: Clock): EventTracer = EventTracerImpl(clock) - } -} - -/** - * Determine whether the [Event] of type [E] is currently enabled in any of the active recordings. - * - * @return `true` if the event is enabled, `false` otherwise. - */ -public inline fun EventTracer.isEnabled(): Boolean = isEnabled(E::class.java) - -/** - * Lazily construct an [Event] of type [E] if it is enabled and commit it to the appropriate event streams. - */ -public inline fun EventTracer.commit(block: () -> E) { - if (isEnabled()) { - commit(block()) - } -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt deleted file mode 100644 index 84dcc61a..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core - -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.channels.sendBlocking -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.callbackFlow - -/** - * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public fun EventStream.asFlow(): Flow = callbackFlow { - onEvent { sendBlocking(it) } - onError { cancel(CancellationException("API error", it)) } - onClose { channel.close() } - awaitClose { this@asFlow.close() } -} - -/** - * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public fun EventStream.consumeAsFlow(): Flow = callbackFlow { - onEvent { sendBlocking(it) } - onError { cancel(CancellationException("API error", it)) } - start() -} - -/** - * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public inline fun EventStream.asTypedFlow(): Flow = callbackFlow { - onEvent { sendBlocking(it) } - onError { cancel(CancellationException("API error", it)) } - onClose { channel.close() } - awaitClose { this@asTypedFlow.close() } -} - -/** - * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public inline fun EventStream.consumeAsTypedFlow(): Flow = callbackFlow { - onEvent { sendBlocking(it) } - onError { cancel(CancellationException("API error", it)) } - start() -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt deleted file mode 100644 index f49e7c49..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core - -/** - * A recording stream that produces events from an [EventTracer]. - */ -public interface RecordingStream : EventStream { - /** - * Enable recording of the specified event [type]. - */ - public fun enable(type: Class) - - /** - * Disable recording of the specified event [type] - */ - public fun disable(type: Class) -} - -/** - * Enable recording of events of type [E]. - */ -public inline fun RecordingStream.enable() { - enable(E::class.java) -} - -/** - * Disable recording of events of type [E]. - */ -public inline fun RecordingStream.disable() { - enable(E::class.java) -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt deleted file mode 100644 index fac99664..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core.internal - -import kotlinx.coroutines.suspendCancellableCoroutine -import org.opendc.trace.core.Event -import org.opendc.trace.core.EventStream -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume - -/** - * Base implementation of the [EventStream] implementation. - */ -internal abstract class AbstractEventStream : EventStream { - /** - * The state of the stream. - */ - protected var state = StreamState.Pending - - /** - * The event actions to dispatch to. - */ - private val eventActions = mutableListOf() - - /** - * The error actions to use. - */ - private val errorActions = mutableListOf<(Throwable) -> Unit>() - - /** - * The close actions to use. - */ - private val closeActions = mutableListOf() - - /** - * The continuation that is invoked when the stream closes. - */ - private var cont: Continuation? = null - - /** - * Dispatch the specified [event] to this stream. - */ - fun dispatch(event: Event) { - val actions = eventActions - - // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type) - for (action in actions) { - if (!action.accepts(event)) { - continue - } - - try { - action(event) - } catch (e: Exception) { - handleError(e) - } - } - } - - /** - * Handle the specified [throwable] that occurred while dispatching an event. - */ - private fun handleError(throwable: Throwable) { - val actions = errorActions - - // Default exception handler - if (actions.isEmpty()) { - throwable.printStackTrace() - return - } - - for (action in actions) { - action(throwable) - } - } - - override fun onEvent(action: (Event) -> Unit) { - eventActions += EventDispatcher(null, action) - } - - override fun onEvent(type: Class, action: (E) -> Unit) { - @Suppress("UNCHECKED_CAST") // This cast must succeed - eventActions += EventDispatcher(type, action as (Event) -> Unit) - } - - override fun onError(action: (Throwable) -> Unit) { - errorActions += action - } - - override fun onClose(action: Runnable) { - closeActions += action - } - - override fun remove(action: Any): Boolean { - return eventActions.removeIf { it.action == action } || errorActions.remove(action) || closeActions.remove(action) - } - - override suspend fun start() { - check(state == StreamState.Pending) { "Stream has already started/closed" } - - state = StreamState.Started - - return suspendCancellableCoroutine { cont -> this.cont = cont } - } - - override fun close() { - if (state == StreamState.Closed) { - return - } - - state = StreamState.Closed - cont?.resume(Unit) - - val actions = closeActions - for (action in actions) { - action.run() - } - } -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt deleted file mode 100644 index 8b6de75e..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core.internal - -import org.opendc.trace.core.Event - -/** - * The [Dispatcher] is responsible for dispatching events onto configured actions. - */ -internal class Dispatcher { - /** - * The event actions to dispatch to. - */ - private val eventActions = mutableListOf() - - /** - * The error actions to use. - */ - private val errorActions = mutableListOf<(Throwable) -> Unit>() - - /** - * Dispatch the specified [event]. - */ - fun dispatch(event: Event) { - val actions = eventActions - - // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type) - for (action in actions) { - if (!action.accepts(event)) { - continue - } - - try { - action(event) - } catch (e: Exception) { - handleError(e) - } - } - } - - /** - * Handle the specified [throwable] that occurred while dispatching an event. - */ - private fun handleError(throwable: Throwable) { - val actions = errorActions - - // Default exception handler - if (actions.isEmpty()) { - throwable.printStackTrace() - return - } - - for (action in actions) { - action(throwable) - } - } -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt deleted file mode 100644 index b2a662eb..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core.internal - -import org.opendc.trace.core.Event - -/** - * A dispatcher responsible for conditionally dispatching an event. - */ -internal class EventDispatcher(val type: Class?, val action: (Event) -> Unit) { - /** - * Determine whether this dispatcher accepts the specified event. - */ - fun accepts(event: Event): Boolean { - return type == null || type.isAssignableFrom(event.javaClass) - } - - /** - * Invoke the specified [event] on this action. - */ - operator fun invoke(event: Event) { - action(event) - } -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt deleted file mode 100644 index e85d0779..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core.internal - -import org.opendc.trace.core.Event -import org.opendc.trace.core.EventTracer -import org.opendc.trace.core.RecordingStream -import java.lang.reflect.Modifier -import java.time.Clock -import java.util.* - -/** - * Default implementation of the [EventTracer] interface. - */ -internal class EventTracerImpl(override val clock: Clock) : EventTracer { - /** - * The set of enabled events. - */ - private val enabledEvents = IdentityHashMap, MutableList>() - - /** - * The event streams created by the tracer. - */ - private val streams = WeakHashMap() - - /** - * A flag to indicate that the stream is closed. - */ - private var isClosed: Boolean = false - - override fun isEnabled(type: Class): Boolean = enabledEvents.containsKey(type) - - override fun commit(event: Event) { - val type = event.javaClass - - // Assign timestamp if not set - if (event.timestamp == Long.MIN_VALUE) { - event.timestamp = clock.millis() - } - - if (!isEnabled(type) || isClosed) { - return - } - - val streams = enabledEvents[type] ?: return - for (stream in streams) { - stream.dispatch(event) - } - } - - override fun openRecording(): RecordingStream = Stream() - - override fun close() { - isClosed = true - - val streams = streams - for ((stream, _) in streams) { - stream.close() - } - - enabledEvents.clear() - } - - /** - * Enable the specified [type] for the given [stream]. - */ - private fun enableFor(type: Class, stream: Stream) { - val res = enabledEvents.computeIfAbsent(type) { mutableListOf() } - res.add(stream) - } - - /** - * Disable the specified [type] for the given [stream]. - */ - private fun disableFor(type: Class, stream: Stream) { - enabledEvents[type]?.remove(stream) - } - - /** - * The [RecordingStream] associated with this [EventTracer] implementation. - */ - private inner class Stream : AbstractEventStream(), RecordingStream { - /** - * The set of enabled events for this stream. - */ - private val enabledEvents = IdentityHashMap, Unit>() - - init { - streams[this] = Unit - } - - override fun enable(type: Class) { - validateEventClass(type) - - if (enabledEvents.put(type, Unit) == null && state == StreamState.Started) { - enableFor(type, this) - } - } - - override fun disable(type: Class) { - validateEventClass(type) - - if (enabledEvents.remove(type) != null && state == StreamState.Started) { - disableFor(type, this) - } - } - - override suspend fun start() { - val enabledEvents = enabledEvents - for ((event, _) in enabledEvents) { - enableFor(event, this) - } - - super.start() - } - - override fun close() { - val enabledEvents = enabledEvents - for ((event, _) in enabledEvents) { - disableFor(event, this) - } - - // Remove this stream from the active streams - streams.remove(this) - - super.close() - } - - /** - * Validate the specified event subclass. - */ - private fun validateEventClass(type: Class) { - require(!Modifier.isAbstract(type.modifiers)) { "Abstract event classes are not allowed" } - require(Event::class.java.isAssignableFrom(type)) { "Must be subclass to ${Event::class.qualifiedName}" } - } - } -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt deleted file mode 100644 index 9f411e0d..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core.internal - -/** - * The state of a [Stream]. - */ -internal enum class StreamState { - Pending, Started, Closed -} diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index a7a55b1c..2e297997 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -37,7 +37,6 @@ include(":opendc-simulator:opendc-simulator-core") include(":opendc-simulator:opendc-simulator-resources") include(":opendc-simulator:opendc-simulator-compute") include(":opendc-simulator:opendc-simulator-failures") -include(":opendc-trace:opendc-trace-core") include(":opendc-telemetry:opendc-telemetry-api") include(":opendc-telemetry:opendc-telemetry-sdk") include(":opendc-harness") -- cgit v1.2.3 From ccd1f96f8568978c80aa0b9a100ca6158ade34ba Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 26 Mar 2021 16:30:09 +0100 Subject: simulator: Cache remaining work This change updates the resource model implementation to cache the remaining work field, which was being computed multiple times during the same cycle. --- .../simulator/resources/SimAbstractResourceContext.kt | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'simulator') diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 9705bd17..5c5ee038 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -54,8 +54,17 @@ public abstract class SimAbstractResourceContext( override val remainingWork: Double get() { val activeCommand = activeCommand ?: return 0.0 - return computeRemainingWork(activeCommand, clock.millis()) + val now = clock.millis() + + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + computeRemainingWork(activeCommand, now).also { _remainingWork = it } + } else { + _remainingWork + } } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE /** * A flag to indicate the state of the context. @@ -201,6 +210,9 @@ public abstract class SimAbstractResourceContext( // Flush may not be called when the resource consumer has finished throw IllegalStateException() } + + // Flush remaining work cache + _remainingWorkFlush = Long.MIN_VALUE } catch (cause: Throwable) { doStop(cause) } finally { -- cgit v1.2.3 From 5b0eaf76ec00192c755b268b7655f6463c5bc62f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 27 Mar 2021 12:33:13 +0100 Subject: compute: Migrate compute service simulator to OpenTelemetry This change updates the compute service simulator to use OpenTelemetry for reporting metrics of the (simulated) hosts as opposed to using custom event flows. This approach is more generic, flexible and possibly offers better performance as we can collect metrics of all services in a single sweep, as opposed to listening to several services and each invoking the handlers. --- .../org/opendc/compute/service/driver/Host.kt | 6 - .../org/opendc/compute/service/driver/HostEvent.kt | 72 --------- .../opendc-compute-simulator/build.gradle.kts | 1 + .../kotlin/org/opendc/compute/simulator/SimHost.kt | 117 +++++++++++--- .../org/opendc/compute/simulator/SimHostTest.kt | 142 ++++++++++------- .../experiments/capelin/ExperimentHelpers.kt | 72 ++------- .../org/opendc/experiments/capelin/Portfolio.kt | 4 +- .../capelin/monitor/ExperimentMetricExporter.kt | 171 +++++++++++++++++++++ .../capelin/monitor/ExperimentMonitor.kt | 3 +- .../experiments/capelin/CapelinIntegrationTest.kt | 9 +- .../src/main/kotlin/org/opendc/runner/web/Main.kt | 4 +- .../simulator/resources/SimResourceBenchmarks.kt | 2 +- .../main/kotlin/org/opendc/utils/flow/EventFlow.kt | 112 -------------- .../service/WorkflowServiceIntegrationTest.kt | 1 + 14 files changed, 368 insertions(+), 348 deletions(-) delete mode 100644 simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt create mode 100644 simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt delete mode 100644 simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt (limited to 'simulator') diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index c3c39572..bed15dfd 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -22,7 +22,6 @@ package org.opendc.compute.service.driver -import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.Server import java.util.* @@ -55,11 +54,6 @@ public interface Host { */ public val meta: Map - /** - * The events emitted by the driver. - */ - public val events: Flow - /** * Determine whether the specified [instance][server] can still fit on this host. */ diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt deleted file mode 100644 index 97350679..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.driver - -/** - * An event that is emitted by a [Host]. - */ -public sealed class HostEvent { - /** - * The driver that emitted the event. - */ - public abstract val driver: Host - - /** - * This event is emitted when the number of active servers on the server managed by this driver is updated. - * - * @property driver The driver that emitted the event. - * @property numberOfActiveServers The number of active servers. - * @property availableMemory The available memory, in MB. - */ - public data class VmsUpdated( - override val driver: Host, - public val numberOfActiveServers: Int, - public val availableMemory: Long - ) : HostEvent() - - /** - * This event is emitted when a slice is finished. - * - * @property driver The driver that emitted the event. - * @property requestedBurst The total requested CPU time (can be above capacity). - * @property grantedBurst The actual total granted capacity, which might be lower than the requested burst due to - * the hypervisor being interrupted during a slice. - * @property overcommissionedBurst The CPU time that the hypervisor could not grant to the virtual machine since - * it did not have the capacity. - * @property interferedBurst The sum of CPU time that virtual machines could not utilize due to performance - * interference. - * @property cpuUsage CPU use in megahertz. - * @property cpuDemand CPU demand in megahertz. - * @property numberOfDeployedImages The number of images deployed on this hypervisor. - */ - public data class SliceFinished( - override val driver: Host, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val numberOfDeployedImages: Int, - ) : HostEvent() -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts index 1ad3f1c6..3bf8a114 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -38,5 +38,6 @@ dependencies { implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) + testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk")) testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 89784803..6d81aa7d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,8 +22,9 @@ package org.opendc.compute.simulator +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.common.Labels import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server @@ -36,7 +37,6 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.MachinePowerModel import org.opendc.simulator.failures.FailureDomain -import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* import kotlin.coroutines.CoroutineContext @@ -52,6 +52,7 @@ public class SimHost( override val meta: Map, context: CoroutineContext, clock: Clock, + meter: Meter, hypervisor: SimHypervisorProvider, powerModel: MachinePowerModel = ConstantPowerModel(0.0), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), @@ -66,10 +67,6 @@ public class SimHost( */ private val logger = KotlinLogging.logger {} - override val events: Flow - get() = _events - internal val _events = EventFlow() - /** * The event listeners registered with this host. */ @@ -99,18 +96,13 @@ public class SimHost( cpuUsage: Double, cpuDemand: Double ) { - _events.emit( - HostEvent.SliceFinished( - this@SimHost, - requestedWork, - grantedWork, - overcommittedWork, - interferedWork, - cpuUsage, - cpuDemand, - guests.size - ) - ) + _cpuWork.record(requestedWork.toDouble()) + _cpuWorkGranted.record(grantedWork.toDouble()) + _cpuWorkOvercommit.record(overcommittedWork.toDouble()) + _cpuWorkInterference.record(interferedWork.toDouble()) + _cpuUsage.record(cpuUsage) + _cpuDemand.record(cpuDemand) + _cpuPower.record(machine.powerDraw.value) } } ) @@ -132,6 +124,87 @@ public class SimHost( override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum()) + /** + * The number of guests on the host. + */ + private val _guests = meter.longUpDownCounterBuilder("guests.total") + .setDescription("Number of guests") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The number of active guests on the host. + */ + private val _activeGuests = meter.longUpDownCounterBuilder("guests.active") + .setDescription("Number of active guests") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The CPU usage on the host. + */ + private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage") + .setDescription("The amount of CPU resources used by the host") + .setUnit("MHz") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The CPU demand on the host. + */ + private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand") + .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + .setUnit("MHz") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The requested work for the CPU. + */ + private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage") + .setDescription("The amount of power used by the CPU") + .setUnit("W") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The requested work for the CPU. + */ + private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total") + .setDescription("The amount of work supplied to the CPU") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work actually performed by the CPU. + */ + private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted") + .setDescription("The amount of work performed by the CPU") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work that could not be performed by the CPU due to overcommitting resource. + */ + private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit") + .setDescription("The amount of work not performed by the CPU due to overcommitment") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work that could not be performed by the CPU due to interference. + */ + private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference") + .setDescription("The amount of work not performed by the CPU due to interference") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + init { // Launch hypervisor onto machine scope.launch { @@ -166,12 +239,11 @@ public class SimHost( require(canFit(server)) { "Server does not fit" } val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) guests[server] = guest + _guests.add(1) if (start) { guest.start() } - - _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) } override fun contains(server: Server): Boolean { @@ -191,6 +263,7 @@ public class SimHost( override suspend fun delete(server: Server) { val guest = guests.remove(server) ?: return guest.terminate() + _guests.add(-1) } override fun addListener(listener: HostListener) { @@ -228,6 +301,7 @@ public class SimHost( } } + _activeGuests.add(1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } } @@ -238,9 +312,8 @@ public class SimHost( } } + _activeGuests.add(-1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - - _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) } override suspend fun fail() { diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index e311cd21..830fc868 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,12 +22,14 @@ package org.opendc.compute.simulator -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.* +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -37,7 +39,8 @@ import org.opendc.compute.api.Image import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.api.ServerWatcher -import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -45,23 +48,20 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.time.Clock +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.sdk.toOtelClock import java.util.UUID +import kotlin.coroutines.resume /** * Basic test-suite for the hypervisor. */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimHostTest { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock private lateinit var machineModel: SimMachineModel @BeforeEach fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( @@ -74,72 +74,98 @@ internal class SimHostTest { * Test overcommitting of resources by the hypervisor. */ @Test - fun testOvercommitted() { + fun testOvercommitted() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) var requestedWork = 0L var grantedWork = 0L var overcommittedWork = 0L - scope.launch { - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider()) - val duration = 5 * 60L - val vmImageA = MockImage( - UUID.randomUUID(), - "", - emptyMap(), - mapOf( - "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) - ), - ) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val duration = 5 * 60L + val vmImageA = MockImage( + UUID.randomUUID(), + "", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) + ), ) ) - val vmImageB = MockImage( - UUID.randomUUID(), - "", - emptyMap(), - mapOf( - "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) - ) + ) + val vmImageB = MockImage( + UUID.randomUUID(), + "", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) ) ) ) + ) - delay(5) - - val flavor = MockFlavor(2, 0) - virtDriver.events - .onEach { event -> - when (event) { - is HostEvent.SliceFinished -> { - requestedWork += event.requestedBurst - grantedWork += event.grantedBurst - overcommittedWork += event.overcommissionedBurst - } - } + val flavor = MockFlavor(2, 0) + + // Setup metric reader + val reader = CoroutineMetricReader( + this, listOf(meterProvider as MetricProducer), + object : MetricExporter { + override fun export(metrics: Collection): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong() + grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong() + overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong() + return CompletableResultCode.ofSuccess() } - .launchIn(this) + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + }, + exportInterval = duration * 1000 + ) + + coroutineScope { launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } + + suspendCancellableCoroutine { cont -> + virtDriver.addListener(object : HostListener { + private var finished = 0 + + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED && ++finished == 2) { + cont.resume(Unit) + } + } + }) + } } - scope.advanceUntilIdle() + // Ensure last cycle is collected + delay(1000 * duration) + virtDriver.close() + reader.close() assertAll( - { assertEquals(emptyList(), scope.uncaughtExceptions, "No errors") }, { assertEquals(4197600, requestedWork, "Requested work does not match") }, { assertEquals(2157600, grantedWork, "Granted work does not match") }, { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, - { assertEquals(1200006, scope.currentTime) } + { assertEquals(1500001, currentTime) } ) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 4f48bba7..40f50235 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -22,24 +22,19 @@ package org.opendc.experiments.capelin -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostEvent import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader @@ -138,7 +133,7 @@ public fun createTraceReader( */ public suspend fun withComputeService( clock: Clock, - meter: Meter, + meterProvider: MeterProvider, environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, block: suspend CoroutineScope.(ComputeService) -> Unit @@ -153,13 +148,15 @@ public suspend fun withComputeService( def.meta, coroutineContext, clock, + meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), def.powerModel ) } + val schedulerMeter = meterProvider.get("opendc-compute") val scheduler = - ComputeService(coroutineContext, clock, meter, allocationPolicy) + ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy) for (host in hosts) { scheduler.addHost(host) @@ -194,62 +191,13 @@ public suspend fun withMonitor( monitor.reportHostStateChange(clock.millis(), host, newState) } }) - - monitorJobs += host.events - .onEach { event -> - when (event) { - is HostEvent.SliceFinished -> monitor.reportHostSlice( - clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.driver - ) - } - } - .launchIn(this) - - monitorJobs += (host as SimHost).machine.powerDraw - .onEach { monitor.reportPowerConsumption(host, it) } - .launchIn(this) } val reader = CoroutineMetricReader( - this, listOf(metricProducer), - object : MetricExporter { - override fun export(metrics: Collection): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - - val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - - monitor.reportProvisionerMetrics( - clock.millis(), - hosts, - availableHosts, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - return CompletableResultCode.ofSuccess() - } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - }, - exportInterval = 5 * 60 * 1000 + this, + listOf(metricProducer), + ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }), + exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */ ) try { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 2921daba..5fa77161 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -126,8 +126,6 @@ public abstract class Portfolio(name: String) : Experiment(name) { .setClock(clock.toOtelClock()) .build() - val meter = meterProvider.get("opendc-compute") - val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } @@ -153,7 +151,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { 4096 ) - withComputeService(clock, meter, environment, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> val failureDomain = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt new file mode 100644 index 00000000..799de60f --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.monitor + +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import org.opendc.compute.service.driver.Host +import java.time.Clock + +/** + * A [MetricExporter] that exports the metrics to the [ExperimentMonitor]. + */ +public class ExperimentMetricExporter( + private val monitor: ExperimentMonitor, + private val clock: Clock, + private val hosts: Map +) : MetricExporter { + override fun export(metrics: Collection): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + reportHostMetrics(metricsByName) + reportProvisionerMetrics(metricsByName) + return CompletableResultCode.ofSuccess() + } + + private fun reportHostMetrics(metrics: Map) { + val hostMetrics = mutableMapOf() + hosts.mapValuesTo(hostMetrics) { HostMetrics() } + + mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> + m.cpuDemand = v + } + + mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> + m.cpuUsage = v + } + + mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v -> + m.powerDraw = v + } + + mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v -> + m.requestedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v -> + m.grantedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> + m.overcommissionedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v -> + m.interferedBurst = v.toLong() + } + + mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> + m.numberOfDeployedImages = v.toInt() + } + + for ((id, hostMetric) in hostMetrics) { + val host = hosts.getValue(id) + monitor.reportHostSlice( + clock.millis(), + hostMetric.requestedBurst, + hostMetric.grantedBurst, + hostMetric.overcommissionedBurst, + hostMetric.interferedBurst, + hostMetric.cpuUsage, + hostMetric.cpuDemand, + hostMetric.numberOfDeployedImages, + host + ) + + monitor.reportPowerConsumption(host, hostMetric.powerDraw) + } + } + + private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleSummaryData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.sum) + } + } + } + + private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleSumData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + + private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Long) -> Unit) { + val points = data?.longSumData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + + private fun reportProvisionerMetrics(metrics: Map) { + val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + + monitor.reportProvisionerMetrics( + clock.millis(), + hosts, + availableHosts, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + } + + private class HostMetrics { + var requestedBurst: Long = 0 + var grantedBurst: Long = 0 + var overcommissionedBurst: Long = 0 + var interferedBurst: Long = 0 + var cpuUsage: Double = 0.0 + var cpuDemand: Double = 0.0 + var numberOfDeployedImages: Int = 0 + var powerDraw: Double = 0.0 + } + + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index a57c8d78..5e75c890 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -26,12 +26,11 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState -import java.io.Closeable /** * A monitor watches the events of an experiment. */ -public interface ExperimentMonitor : Closeable { +public interface ExperimentMonitor : AutoCloseable { /** * This method is invoked when the state of a VM changes. */ diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index fd906f4d..02cfdc06 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer @@ -82,9 +81,7 @@ class CapelinIntegrationTest { .setClock(clock.toOtelClock()) .build() - val meter: Meter = meterProvider.get("opendc-compute") - - withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( @@ -142,9 +139,7 @@ class CapelinIntegrationTest { .setClock(clock.toOtelClock()) .build() - val meter: Meter = meterProvider.get("opendc-compute") - - withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { processTrace( clock, diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 706efdc9..5b717ff7 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -34,7 +34,6 @@ import com.mongodb.client.MongoClients import com.mongodb.client.MongoCollection import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters -import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer @@ -226,7 +225,6 @@ public class RunnerCli : CliktCommand(name = "runner") { .setClock(clock.toOtelClock()) .build() val metricProducer = meterProvider as MetricProducer - val meter: Meter = meterProvider.get("opendc-compute") val operational = scenario.get("operational", Document::class.java) val allocationPolicy = @@ -254,7 +252,7 @@ public class RunnerCli : CliktCommand(name = "runner") { val environment = TopologyParser(topologies, topologyId) val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7 - withComputeService(clock, meter, environment, allocationPolicy) { scheduler -> + withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> val failureDomain = if (failureFrequency > 0) { logger.debug { "ENABLING failures" } createFailureDomain( diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index 937b6966..f2eea97c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -71,7 +71,7 @@ class SimResourceBenchmarks { fun benchmarkForwardOverhead(state: Workload) { return scope.runBlockingTest { val provider = SimResourceSource(4200.0, clock, scheduler) - val forwarder = SimResourceTransformer() + val forwarder = SimResourceForwarder() provider.startConsumer(forwarder) return@runBlockingTest forwarder.consume(state.consumers[0]) } diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt deleted file mode 100644 index 10f29f4e..00000000 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.utils.flow - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.flow.consumeAsFlow - -/** - * A [Flow] that can be used to emit events. - */ -public interface EventFlow : Flow { - /** - * Emit the specified [event]. - */ - public fun emit(event: T) - - /** - * Close the flow. - */ - public fun close() -} - -/** - * Creates a new [EventFlow]. - */ -@Suppress("FunctionName") -public fun EventFlow(): EventFlow = EventFlowImpl() - -/** - * Internal implementation of the [EventFlow] class. - */ -@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) -private class EventFlowImpl : EventFlow { - private var closed: Boolean = false - private val subscribers = mutableListOf>() - - override fun emit(event: T) { - if (closed) { - return - } - - val it = subscribers.iterator() - synchronized(this) { - while (it.hasNext()) { - val chan = it.next() - if (chan.isClosedForSend) { - it.remove() - } else { - chan.offer(event) - } - } - } - } - - override fun close() { - synchronized(this) { - closed = true - - for (chan in subscribers) { - chan.close() - } - - subscribers.clear() - } - } - - @InternalCoroutinesApi - override suspend fun collect(collector: FlowCollector) { - val channel: Channel - synchronized(this) { - if (closed) { - return - } - - channel = Channel(Channel.UNLIMITED) - subscribers.add(channel) - } - try { - channel.consumeAsFlow().collect(collector) - } finally { - channel.close() - } - } - - override fun toString(): String = "EventFlow" -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt index e06e5eb3..46c0d835 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -78,6 +78,7 @@ internal class WorkflowServiceIntegrationTest { def.meta, coroutineContext, clock, + MeterProvider.noop().get("opendc-compute-simulator"), SimSpaceSharedHypervisorProvider() ) } -- cgit v1.2.3