diff options
72 files changed, 2435 insertions, 2337 deletions
diff --git a/simulator/buildSrc/src/main/kotlin/Versions.kt b/simulator/buildSrc/src/main/kotlin/Versions.kt index d1df6284..6aa9260b 100644 --- a/simulator/buildSrc/src/main/kotlin/Versions.kt +++ b/simulator/buildSrc/src/main/kotlin/Versions.kt @@ -49,6 +49,11 @@ public class Versions(private val project: Project) { val kotlinxCoroutines by version(name = "kotlinx-coroutines") + val otelApi by version(name = "opentelemetry-api") + val otelApiMetrics by version(name = "opentelemetry-api-metrics") + val otelSdk by version(name = "opentelemetry-sdk") + val otelSdkMetrics by version(name = "opentelemetry-sdk-metrics") + /** * Obtain the version for the specified [dependency][name]. diff --git a/simulator/gradle.properties b/simulator/gradle.properties index 8d41408c..99b08bb2 100644 --- a/simulator/gradle.properties +++ b/simulator/gradle.properties @@ -28,6 +28,12 @@ kotlin-logging.version = 2.0.6 slf4j.version = 1.7.30 log4j.version = 2.14.1 +# Dependencies (Telemetry) +opentelemetry-api.version = 1.0.1 +opentelemetry-api-metrics.version = 1.0.1-alpha +opentelemetry-sdk.version = 1.0.1 +opentelemetry-sdk-metrics.version = 1.0.1-alpha + # Dependencies (CLI) clikt.version = 3.1.0 progressbar.version = 0.9.0 diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts index 1b09ef6d..909e2dcd 100644 --- a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts @@ -26,15 +26,16 @@ description = "OpenDC Compute Service implementation" plugins { `kotlin-library-conventions` `testing-conventions` + `jacoco-conventions` } dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-trace:opendc-trace-core")) + api(project(":opendc-telemetry:opendc-telemetry-api")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) - testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl") } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 593e4b56..98566da3 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -22,12 +22,11 @@ package org.opendc.compute.service -import kotlinx.coroutines.flow.Flow +import io.opentelemetry.api.metrics.Meter import org.opendc.compute.api.ComputeClient import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AllocationPolicy -import org.opendc.trace.core.EventTracer import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -36,11 +35,6 @@ import kotlin.coroutines.CoroutineContext */ public interface ComputeService : AutoCloseable { /** - * The events emitted by the service. - */ - public val events: Flow<ComputeServiceEvent> - - /** * The hosts that are used by the compute service. */ public val hosts: Set<Host> @@ -76,17 +70,16 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param tracer The event tracer to use. * @param allocationPolicy The allocation policy to use. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - tracer: EventTracer, + meter: Meter, allocationPolicy: AllocationPolicy, schedulingQuantum: Long = 300000, ): ComputeService { - return ComputeServiceImpl(context, clock, tracer, allocationPolicy, schedulingQuantum) + return ComputeServiceImpl(context, clock, meter, allocationPolicy, schedulingQuantum) } } } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt deleted file mode 100644 index 193008a7..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service - -/** - * An event that is emitted by the [ComputeService]. - */ -public sealed class ComputeServiceEvent { - /** - * The service that has emitted the event. - */ - public abstract val provisioner: ComputeService - - /** - * An event emitted for writing metrics. - */ - public data class MetricsAvailable( - override val provisioner: ComputeService, - public val totalHostCount: Int, - public val availableHostCount: Int, - public val totalVmCount: Int, - public val activeVmCount: Int, - public val inactiveVmCount: Int, - public val waitingVmCount: Int, - public val failedVmCount: Int - ) : ComputeServiceEvent() -} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index c3c39572..bed15dfd 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -22,7 +22,6 @@ package org.opendc.compute.service.driver -import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.Server import java.util.* @@ -56,11 +55,6 @@ public interface Host { public val meta: Map<String, Any> /** - * The events emitted by the driver. - */ - public val events: Flow<HostEvent> - - /** * Determine whether the specified [instance][server] can still fit on this host. */ public fun canFit(server: Server): Boolean diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt deleted file mode 100644 index 97350679..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.driver - -/** - * An event that is emitted by a [Host]. - */ -public sealed class HostEvent { - /** - * The driver that emitted the event. - */ - public abstract val driver: Host - - /** - * This event is emitted when the number of active servers on the server managed by this driver is updated. - * - * @property driver The driver that emitted the event. - * @property numberOfActiveServers The number of active servers. - * @property availableMemory The available memory, in MB. - */ - public data class VmsUpdated( - override val driver: Host, - public val numberOfActiveServers: Int, - public val availableMemory: Long - ) : HostEvent() - - /** - * This event is emitted when a slice is finished. - * - * @property driver The driver that emitted the event. - * @property requestedBurst The total requested CPU time (can be above capacity). - * @property grantedBurst The actual total granted capacity, which might be lower than the requested burst due to - * the hypervisor being interrupted during a slice. - * @property overcommissionedBurst The CPU time that the hypervisor could not grant to the virtual machine since - * it did not have the capacity. - * @property interferedBurst The sum of CPU time that virtual machines could not utilize due to performance - * interference. - * @property cpuUsage CPU use in megahertz. - * @property cpuDemand CPU demand in megahertz. - * @property numberOfDeployedImages The number of images deployed on this hypervisor. - */ - public data class SliceFinished( - override val driver: Host, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val numberOfDeployedImages: Int, - ) : HostEvent() -} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt deleted file mode 100644 index a7974062..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.trace.core.Event -import java.util.* - -/** - * This event is emitted when a hypervisor has become available. - */ -public class HypervisorAvailableEvent(public val uid: UUID) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt deleted file mode 100644 index 75bb09ed..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.trace.core.Event -import java.util.* - -/** - * This event is emitted when a hypervisor has become unavailable. - */ -public class HypervisorUnavailableEvent(public val uid: UUID) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt deleted file mode 100644 index f59c74b7..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.trace.core.Event - -/** - * This event is emitted when a virtual machine has successfully been scheduled on a hypervisor. - */ -public class VmScheduledEvent(public val name: String) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt deleted file mode 100644 index eaf0736b..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.trace.core.Event - -/** - * This event is emitted when a virtual machine has stopped running. - */ -public class VmStoppedEvent(public val name: String) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt deleted file mode 100644 index fa0a8a13..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.trace.core.Event - -/** - * This event is emitted when a virtual machine is submitted to the provisioning service. - */ -public class VmSubmissionEvent(public val name: String, public val image: Image, public val flavor: Flavor) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt deleted file mode 100644 index 52b91616..00000000 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.service.events - -import org.opendc.trace.core.Event - -/** - * An event that is emitted when the submission is deemed to be invalid. - */ -public class VmSubmissionInvalidEvent(public val name: String) : Event() diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt index 29f10e27..4a8d3046 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt @@ -59,4 +59,10 @@ internal class ClientFlavor(private val delegate: Flavor) : Flavor { labels = delegate.labels meta = delegate.meta } + + override fun equals(other: Any?): Boolean = other is Flavor && other.uid == uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Flavor[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt index 6c5b2ab0..e0b5c171 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt @@ -52,4 +52,10 @@ internal class ClientImage(private val delegate: Image) : Image { labels = delegate.labels meta = delegate.meta } + + override fun equals(other: Any?): Boolean = other is Image && other.uid == uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Image[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt index ae4cee3b..f2929bf3 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -104,4 +104,10 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche watcher.onStateChanged(this, newState) } } + + override fun equals(other: Any?): Boolean = other is Server && other.uid == uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Server[uid=$uid,name=$name,state=$state]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index aa7e0aa1..26a34ad9 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,20 +22,16 @@ package org.opendc.compute.service.internal +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.events.* import org.opendc.compute.service.scheduler.AllocationPolicy -import org.opendc.trace.core.EventTracer import org.opendc.utils.TimerScheduler -import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* import kotlin.coroutines.CoroutineContext @@ -47,17 +43,17 @@ import kotlin.math.max * @param context The [CoroutineContext] to use. * @param clock The clock instance to keep track of time. */ -public class ComputeServiceImpl( +internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - private val tracer: EventTracer, + private val meter: Meter, private val allocationPolicy: AllocationPolicy, private val schedulingQuantum: Long ) : ComputeService, HostListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - private val scope = CoroutineScope(context) + private val scope = CoroutineScope(context + Job()) /** * The logger instance of this server. @@ -104,24 +100,70 @@ public class ComputeServiceImpl( */ private val servers = mutableMapOf<UUID, InternalServer>() - public var submittedVms: Int = 0 - public var queuedVms: Int = 0 - public var runningVms: Int = 0 - public var finishedVms: Int = 0 - public var unscheduledVms: Int = 0 - private var maxCores = 0 private var maxMemory = 0L /** + * The number of servers that have been submitted to the service for provisioning. + */ + private val _submittedServers = meter.longCounterBuilder("servers.submitted") + .setDescription("Number of start requests") + .setUnit("1") + .build() + + /** + * The number of servers that failed to be scheduled. + */ + private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled") + .setDescription("Number of unscheduled servers") + .setUnit("1") + .build() + + /** + * The number of servers that are waiting to be provisioned. + */ + private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting") + .setDescription("Number of servers waiting to be provisioned") + .setUnit("1") + .build() + + /** + * The number of servers that are waiting to be provisioned. + */ + private val _runningServers = meter.longUpDownCounterBuilder("servers.active") + .setDescription("Number of servers currently running") + .setUnit("1") + .build() + + /** + * The number of servers that have finished running. + */ + private val _finishedServers = meter.longCounterBuilder("servers.finished") + .setDescription("Number of servers that finished running") + .setUnit("1") + .build() + + /** + * The number of hosts registered at the compute service. + */ + private val _hostCount = meter.longUpDownCounterBuilder("hosts.total") + .setDescription("Number of hosts") + .setUnit("1") + .build() + + /** + * The number of available hosts registered at the compute service. + */ + private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available") + .setDescription("Number of available hosts") + .setUnit("1") + .build() + + /** * The allocation logic to use. */ private val allocationLogic = allocationPolicy() - override val events: Flow<ComputeServiceEvent> - get() = _events - private val _events = EventFlow<ComputeServiceEvent>() - /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ @@ -133,130 +175,119 @@ public class ComputeServiceImpl( override val hostCount: Int get() = hostToView.size - override fun newClient(): ComputeClient = object : ComputeClient { - private var isClosed: Boolean = false + override fun newClient(): ComputeClient { + check(scope.isActive) { "Service is already closed" } + return object : ComputeClient { + private var isClosed: Boolean = false - override suspend fun queryFlavors(): List<Flavor> { - check(!isClosed) { "Client is already closed" } + override suspend fun queryFlavors(): List<Flavor> { + check(!isClosed) { "Client is already closed" } - return flavors.values.map { ClientFlavor(it) } - } + return flavors.values.map { ClientFlavor(it) } + } - override suspend fun findFlavor(id: UUID): Flavor? { - check(!isClosed) { "Client is already closed" } + override suspend fun findFlavor(id: UUID): Flavor? { + check(!isClosed) { "Client is already closed" } - return flavors[id]?.let { ClientFlavor(it) } - } + return flavors[id]?.let { ClientFlavor(it) } + } - override suspend fun newFlavor( - name: String, - cpuCount: Int, - memorySize: Long, - labels: Map<String, String>, - meta: Map<String, Any> - ): Flavor { - check(!isClosed) { "Client is already closed" } - - val uid = UUID(clock.millis(), random.nextLong()) - val flavor = InternalFlavor( - this@ComputeServiceImpl, - uid, - name, - cpuCount, - memorySize, - labels, - meta - ) - - flavors[uid] = flavor - - return ClientFlavor(flavor) - } + override suspend fun newFlavor( + name: String, + cpuCount: Int, + memorySize: Long, + labels: Map<String, String>, + meta: Map<String, Any> + ): Flavor { + check(!isClosed) { "Client is already closed" } + + val uid = UUID(clock.millis(), random.nextLong()) + val flavor = InternalFlavor( + this@ComputeServiceImpl, + uid, + name, + cpuCount, + memorySize, + labels, + meta + ) - override suspend fun queryImages(): List<Image> { - check(!isClosed) { "Client is already closed" } + flavors[uid] = flavor - return images.values.map { ClientImage(it) } - } + return ClientFlavor(flavor) + } - override suspend fun findImage(id: UUID): Image? { - check(!isClosed) { "Client is already closed" } + override suspend fun queryImages(): List<Image> { + check(!isClosed) { "Client is already closed" } - return images[id]?.let { ClientImage(it) } - } + return images.values.map { ClientImage(it) } + } - override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image { - check(!isClosed) { "Client is already closed" } + override suspend fun findImage(id: UUID): Image? { + check(!isClosed) { "Client is already closed" } - val uid = UUID(clock.millis(), random.nextLong()) - val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) + return images[id]?.let { ClientImage(it) } + } - images[uid] = image + override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image { + check(!isClosed) { "Client is already closed" } - return ClientImage(image) - } + val uid = UUID(clock.millis(), random.nextLong()) + val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) - override suspend fun newServer( - name: String, - image: Image, - flavor: Flavor, - labels: Map<String, String>, - meta: Map<String, Any>, - start: Boolean - ): Server { - check(!isClosed) { "Client is closed" } - tracer.commit(VmSubmissionEvent(name, image, flavor)) - - _events.emit( - ComputeServiceEvent.MetricsAvailable( + images[uid] = image + + return ClientImage(image) + } + + override suspend fun newServer( + name: String, + image: Image, + flavor: Flavor, + labels: Map<String, String>, + meta: Map<String, Any>, + start: Boolean + ): Server { + check(!isClosed) { "Client is closed" } + + val uid = UUID(clock.millis(), random.nextLong()) + val server = InternalServer( this@ComputeServiceImpl, - hostCount, - availableHosts.size, - ++submittedVms, - runningVms, - finishedVms, - ++queuedVms, - unscheduledVms + uid, + name, + requireNotNull(flavors[flavor.uid]) { "Unknown flavor" }, + requireNotNull(images[image.uid]) { "Unknown image" }, + labels.toMutableMap(), + meta.toMutableMap() ) - ) - - val uid = UUID(clock.millis(), random.nextLong()) - val server = InternalServer( - this@ComputeServiceImpl, - uid, - name, - flavor, - image, - labels.toMutableMap(), - meta.toMutableMap() - ) - - servers[uid] = server - - if (start) { - server.start() + + servers[uid] = server + + if (start) { + server.start() + } + + return ClientServer(server) } - return ClientServer(server) - } + override suspend fun findServer(id: UUID): Server? { + check(!isClosed) { "Client is already closed" } - override suspend fun findServer(id: UUID): Server? { - check(!isClosed) { "Client is already closed" } + return servers[id]?.let { ClientServer(it) } + } - return servers[id]?.let { ClientServer(it) } - } + override suspend fun queryServers(): List<Server> { + check(!isClosed) { "Client is already closed" } - override suspend fun queryServers(): List<Server> { - check(!isClosed) { "Client is already closed" } + return servers.values.map { ClientServer(it) } + } - return servers.values.map { ClientServer(it) } - } + override fun close() { + isClosed = true + } - override fun close() { - isClosed = true + override fun toString(): String = "ComputeClient" } - - override fun toString(): String = "ComputeClient" } override fun addHost(host: Host) { @@ -271,37 +302,50 @@ public class ComputeServiceImpl( hostToView[host] = hv if (host.state == HostState.UP) { + _availableHostCount.add(1) availableHosts += hv } + _hostCount.add(1) host.addListener(this) } override fun removeHost(host: Host) { - host.removeListener(this) + val view = hostToView.remove(host) + if (view != null) { + if (availableHosts.remove(view)) { + _availableHostCount.add(-1) + } + host.removeListener(this) + _hostCount.add(-1) + } } override fun close() { scope.cancel() } - internal fun schedule(server: InternalServer) { + internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } - queue.add(SchedulingRequest(server)) + val request = SchedulingRequest(server) + queue.add(request) + _submittedServers.add(1) + _waitingServers.add(1) requestSchedulingCycle() + return request } internal fun delete(flavor: InternalFlavor) { - checkNotNull(flavors.remove(flavor.uid)) { "Flavor was not known" } + flavors.remove(flavor.uid) } internal fun delete(image: InternalImage) { - checkNotNull(images.remove(image.uid)) { "Image was not known" } + images.remove(image.uid) } internal fun delete(server: InternalServer) { - checkNotNull(servers.remove(server.uid)) { "Server was not known" } + servers.remove(server.uid) } /** @@ -332,34 +376,24 @@ public class ComputeServiceImpl( if (request.isCancelled) { queue.poll() + _waitingServers.add(-1) continue } val server = request.server val hv = allocationLogic.select(availableHosts, request.server) if (hv == null || !hv.host.canFit(server)) { - logger.trace { "Server $server selected for scheduling but no capacity available for it." } + logger.trace { "Server $server selected for scheduling but no capacity available for it at the moment" } if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { - tracer.commit(VmSubmissionInvalidEvent(server.name)) - - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - runningVms, - finishedVms, - --queuedVms, - ++unscheduledVms - ) - ) - // Remove the incoming image queue.poll() + _waitingServers.add(-1) + _unscheduledServers.add(1) logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") + + server.state = ServerState.ERROR continue } else { break @@ -370,44 +404,28 @@ public class ComputeServiceImpl( // Remove request from queue queue.poll() + _waitingServers.add(-1) logger.info { "Assigned server $server to host $host." } - try { - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - hv.numberOfActiveServers++ - hv.provisionedCores += server.flavor.cpuCount - hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack - - scope.launch { - try { - server.assignHost(host) - host.spawn(server) - activeServers[server] = host - - tracer.commit(VmScheduledEvent(server.name)) - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - ) - ) - } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) - - hv.numberOfActiveServers-- - hv.provisionedCores -= server.flavor.cpuCount - hv.availableMemory += server.flavor.memorySize - } + + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + hv.numberOfActiveServers++ + hv.provisionedCores += server.flavor.cpuCount + hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack + + scope.launch { + try { + server.host = host + host.spawn(server) + activeServers[server] = host + } catch (e: Throwable) { + logger.error("Failed to deploy VM", e) + + hv.numberOfActiveServers-- + hv.provisionedCores -= server.flavor.cpuCount + hv.availableMemory += server.flavor.memorySize } - } catch (e: Exception) { - logger.warn(e) { "Failed to assign server $server to $host. " } } } } @@ -415,7 +433,7 @@ public class ComputeServiceImpl( /** * A request to schedule an [InternalServer] onto one of the [Host]s. */ - private data class SchedulingRequest(val server: InternalServer) { + internal data class SchedulingRequest(val server: InternalServer) { /** * A flag to indicate that the request is cancelled. */ @@ -431,23 +449,9 @@ public class ComputeServiceImpl( if (hv != null) { // Corner case for when the hypervisor already exists availableHosts += hv + _availableHostCount.add(1) } - tracer.commit(HypervisorAvailableEvent(host.uid)) - - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) - // Re-schedule on the new machine requestSchedulingCycle() } @@ -456,21 +460,7 @@ public class ComputeServiceImpl( val hv = hostToView[host] ?: return availableHosts -= hv - - tracer.commit(HypervisorUnavailableEvent(hv.uid)) - - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - ) + _availableHostCount.add(-1) requestSchedulingCycle() } @@ -488,25 +478,15 @@ public class ComputeServiceImpl( server.state = newState - if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { + if (newState == ServerState.RUNNING) { + _runningServers.add(1) + } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } - tracer.commit(VmStoppedEvent(server.name)) - - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - ) - ) - activeServers -= server + _runningServers.add(-1) + _finishedServers.add(1) + val hv = hostToView[host] if (hv != null) { hv.provisionedCores -= server.flavor.cpuCount diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt index 1bdfdf1a..5793541f 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt @@ -32,4 +32,6 @@ public class HostView(public val host: Host) { public var numberOfActiveServers: Int = 0 public var availableMemory: Long = host.model.memorySize public var provisionedCores: Int = 0 + + override fun toString(): String = "HostView[host=$host]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt index 95e280df..b8fb6279 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt @@ -58,7 +58,9 @@ internal class InternalFlavor( service.delete(this) } - override fun equals(other: Any?): Boolean = other is InternalFlavor && uid == other.uid + override fun equals(other: Any?): Boolean = other is Flavor && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Flavor[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt index 86f2f6b9..d9ed5896 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt @@ -48,7 +48,9 @@ internal class InternalImage( service.delete(this) } - override fun equals(other: Any?): Boolean = other is InternalImage && uid == other.uid + override fun equals(other: Any?): Boolean = other is Image && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Image[uid=$uid,name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index ff7c1d15..d9d0f3fc 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -34,8 +34,8 @@ internal class InternalServer( private val service: ComputeServiceImpl, override val uid: UUID, override val name: String, - override val flavor: Flavor, - override val image: Image, + override val flavor: InternalFlavor, + override val image: InternalImage, override val labels: MutableMap<String, String>, override val meta: MutableMap<String, Any> ) : Server { @@ -54,6 +54,11 @@ internal class InternalServer( */ internal var host: Host? = null + /** + * The current scheduling request. + */ + private var request: ComputeServiceImpl.SchedulingRequest? = null + override suspend fun start() { when (state) { ServerState.RUNNING -> { @@ -66,35 +71,43 @@ internal class InternalServer( } ServerState.DELETED -> { logger.warn { "User tried to start terminated server" } - throw IllegalArgumentException("Server is terminated") + throw IllegalStateException("Server is terminated") } else -> { logger.info { "User requested to start server $uid" } state = ServerState.PROVISIONING - service.schedule(this) + assert(request == null) { "Scheduling request already active" } + request = service.schedule(this) } } } override suspend fun stop() { when (state) { - ServerState.PROVISIONING -> {} // TODO Find way to interrupt these + ServerState.PROVISIONING -> { + cancelProvisioningRequest() + state = ServerState.TERMINATED + } ServerState.RUNNING, ServerState.ERROR -> { val host = checkNotNull(host) { "Server not running" } host.stop(this) } - ServerState.TERMINATED -> {} // No work needed - ServerState.DELETED -> throw IllegalStateException("Server is terminated") + ServerState.TERMINATED, ServerState.DELETED -> {} // No work needed } } override suspend fun delete() { when (state) { - ServerState.PROVISIONING -> {} // TODO Find way to interrupt these - ServerState.RUNNING -> { + ServerState.PROVISIONING, ServerState.TERMINATED -> { + cancelProvisioningRequest() + service.delete(this) + state = ServerState.DELETED + } + ServerState.RUNNING, ServerState.ERROR -> { val host = checkNotNull(host) { "Server not running" } host.delete(this) service.delete(this) + state = ServerState.DELETED } else -> {} // No work needed } @@ -121,11 +134,20 @@ internal class InternalServer( field = value } - internal fun assignHost(host: Host) { - this.host = host + /** + * Cancel the provisioning request if active. + */ + private fun cancelProvisioningRequest() { + val request = request + if (request != null) { + this.request = null + request.isCancelled = true + } } - override fun equals(other: Any?): Boolean = other is InternalServer && uid == other.uid + override fun equals(other: Any?): Boolean = other is Server && uid == other.uid override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Server[uid=$uid,state=$state]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt index ed1dc662..2c953f8b 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt @@ -20,14 +20,11 @@ * SOFTWARE. */ -package org.opendc.compute.simulator.allocation +package org.opendc.compute.service.scheduler import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.service.internal.HostView -import org.opendc.compute.service.scheduler.AllocationPolicy - -private val logger = KotlinLogging.logger {} /** * Policy replaying VM-cluster assignment. @@ -36,6 +33,8 @@ private val logger = KotlinLogging.logger {} * assigned the VM image. */ public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String>) : AllocationPolicy { + private val logger = KotlinLogging.logger {} + override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( hypervisors: Set<HostView>, diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt new file mode 100644 index 00000000..45a306aa --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -0,0 +1,390 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import io.mockk.* +import io.opentelemetry.api.metrics.MeterProvider +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.compute.api.* +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener +import org.opendc.compute.service.driver.HostModel +import org.opendc.compute.service.driver.HostState +import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy +import org.opendc.simulator.utils.DelayControllerClockAdapter +import java.util.* + +/** + * Test suite for the [ComputeService] interface. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class ComputeServiceTest { + lateinit var scope: TestCoroutineScope + lateinit var service: ComputeService + + @BeforeEach + fun setUp() { + scope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(scope) + val policy = AvailableMemoryAllocationPolicy() + val meter = MeterProvider.noop().get("opendc-compute") + service = ComputeService(scope.coroutineContext, clock, meter, policy) + } + + @AfterEach + fun tearDown() { + scope.cleanupTestCoroutines() + } + + @Test + fun testClientClose() = scope.runBlockingTest { + val client = service.newClient() + + assertEquals(emptyList<Flavor>(), client.queryFlavors()) + assertEquals(emptyList<Image>(), client.queryImages()) + assertEquals(emptyList<Server>(), client.queryServers()) + + client.close() + + assertThrows<IllegalStateException> { client.queryFlavors() } + assertThrows<IllegalStateException> { client.queryImages() } + assertThrows<IllegalStateException> { client.queryServers() } + + assertThrows<IllegalStateException> { client.findFlavor(UUID.randomUUID()) } + assertThrows<IllegalStateException> { client.findImage(UUID.randomUUID()) } + assertThrows<IllegalStateException> { client.findServer(UUID.randomUUID()) } + + assertThrows<IllegalStateException> { client.newFlavor("test", 1, 2) } + assertThrows<IllegalStateException> { client.newImage("test") } + assertThrows<IllegalStateException> { client.newServer("test", mockk(), mockk()) } + } + + @Test + fun testClientCreate() = scope.runBlockingTest { + val client = service.newClient() + + val flavor = client.newFlavor("test", 1, 1024) + assertEquals(listOf(flavor), client.queryFlavors()) + assertEquals(flavor, client.findFlavor(flavor.uid)) + val image = client.newImage("test") + assertEquals(listOf(image), client.queryImages()) + assertEquals(image, client.findImage(image.uid)) + val server = client.newServer("test", image, flavor, start = false) + assertEquals(listOf(server), client.queryServers()) + assertEquals(server, client.findServer(server.uid)) + + server.delete() + assertNull(client.findServer(server.uid)) + + image.delete() + assertNull(client.findImage(image.uid)) + + flavor.delete() + assertNull(client.findFlavor(flavor.uid)) + + assertThrows<IllegalStateException> { server.start() } + } + + @Test + fun testClientOnClose() = scope.runBlockingTest { + service.close() + assertThrows<IllegalStateException> { + service.newClient() + } + } + + @Test + fun testAddHost() = scope.runBlockingTest { + val host = mockk<Host>(relaxUnitFun = true) + + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + + assertEquals(0, service.hostCount) + assertEquals(emptySet<Host>(), service.hosts) + + service.addHost(host) + + verify(exactly = 1) { host.addListener(any()) } + + assertEquals(1, service.hostCount) + assertEquals(1, service.hosts.size) + + service.removeHost(host) + + verify(exactly = 1) { host.removeListener(any()) } + } + + @Test + fun testAddHostDouble() = scope.runBlockingTest { + val host = mockk<Host>(relaxUnitFun = true) + + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.DOWN + + assertEquals(0, service.hostCount) + assertEquals(emptySet<Host>(), service.hosts) + + service.addHost(host) + service.addHost(host) + + verify(exactly = 1) { host.addListener(any()) } + } + + @Test + fun testServerStartWithoutEnoughCpus() = scope.runBlockingTest { + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 0) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.ERROR, server.state) + } + + @Test + fun testServerStartWithoutEnoughMemory() = scope.runBlockingTest { + val client = service.newClient() + val flavor = client.newFlavor("test", 0, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.ERROR, server.state) + } + + @Test + fun testServerStartWithoutEnoughResources() = scope.runBlockingTest { + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.ERROR, server.state) + } + + @Test + fun testServerCancelRequest() = scope.runBlockingTest { + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + server.stop() + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.TERMINATED, server.state) + } + + @Test + fun testServerCannotFitOnHost() = scope.runBlockingTest { + val host = mockk<Host>(relaxUnitFun = true) + + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns false + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(10 * 60 * 1000) + server.refresh() + assertEquals(ServerState.PROVISIONING, server.state) + + verify { host.canFit(server) } + } + + @Test + fun testHostAvailableAfterSomeTime() = scope.runBlockingTest { + val host = mockk<Host>(relaxUnitFun = true) + val listeners = mutableListOf<HostListener>() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.DOWN + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + every { host.canFit(any()) } returns false + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5 * 60 * 1000) + + listeners.forEach { it.onStateChanged(host, HostState.UP) } + + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.PROVISIONING, server.state) + + verify { host.canFit(server) } + } + + @Test + fun testHostUnavailableAfterSomeTime() = scope.runBlockingTest { + val host = mockk<Host>(relaxUnitFun = true) + val listeners = mutableListOf<HostListener>() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + every { host.canFit(any()) } returns false + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + delay(5 * 60 * 1000) + + listeners.forEach { it.onStateChanged(host, HostState.DOWN) } + + server.start() + delay(5 * 60 * 1000) + server.refresh() + assertEquals(ServerState.PROVISIONING, server.state) + + verify(exactly = 0) { host.canFit(server) } + } + + @Test + fun testServerInvalidType() = scope.runBlockingTest { + val host = mockk<Host>(relaxUnitFun = true) + val listeners = mutableListOf<HostListener>() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns true + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + assertThrows<IllegalArgumentException> { + listeners.forEach { it.onStateChanged(host, server, ServerState.RUNNING) } + } + } + + @Test + fun testServerDeploy() = scope.runBlockingTest { + val host = mockk<Host>(relaxUnitFun = true) + val listeners = mutableListOf<HostListener>() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns true + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + val slot = slot<Server>() + + val watcher = mockk<ServerWatcher>(relaxUnitFun = true) + server.watch(watcher) + + // Start server + server.start() + delay(5 * 60 * 1000) + coVerify { host.spawn(capture(slot), true) } + + listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) } + + server.refresh() + assertEquals(ServerState.RUNNING, server.state) + + verify { watcher.onStateChanged(server, ServerState.RUNNING) } + + // Stop server + listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.TERMINATED) } + + server.refresh() + assertEquals(ServerState.TERMINATED, server.state) + + verify { watcher.onStateChanged(server, ServerState.TERMINATED) } + } + + @Test + fun testServerDeployFailure() = scope.runBlockingTest { + val host = mockk<Host>(relaxUnitFun = true) + val listeners = mutableListOf<HostListener>() + + every { host.uid } returns UUID.randomUUID() + every { host.model } returns HostModel(4, 2048) + every { host.state } returns HostState.UP + every { host.canFit(any()) } returns true + every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } + coEvery { host.spawn(any(), true) } throws IllegalStateException() + + service.addHost(host) + + val client = service.newClient() + val flavor = client.newFlavor("test", 1, 1024) + val image = client.newImage("test") + val server = client.newServer("test", image, flavor, start = false) + + server.start() + delay(5 * 60 * 1000) + + server.refresh() + assertEquals(ServerState.PROVISIONING, server.state) + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt new file mode 100644 index 00000000..18d698c6 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import io.mockk.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals +import org.junit.jupiter.api.Test +import org.opendc.compute.api.Flavor +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.internal.InternalFlavor +import java.util.* + +/** + * Test suite for the [InternalFlavor] implementation. + */ +class InternalFlavorTest { + @Test + fun testEquality() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf()) + val b = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf()) + + assertEquals(a, b) + } + + @Test + fun testEqualityWithDifferentType() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf()) + + val b = mockk<Flavor>(relaxUnitFun = true) + every { b.uid } returns uid + + assertEquals(a, b) + } + + @Test + fun testInequalityWithDifferentType() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf()) + + val b = mockk<Flavor>(relaxUnitFun = true) + every { b.uid } returns UUID.randomUUID() + + assertNotEquals(a, b) + } + + @Test + fun testInequalityWithIncorrectType() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf()) + + assertNotEquals(a, Unit) + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt new file mode 100644 index 00000000..e1cb0128 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import io.mockk.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals +import org.junit.jupiter.api.Test +import org.opendc.compute.api.Image +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.internal.InternalFlavor +import org.opendc.compute.service.internal.InternalImage +import java.util.* + +/** + * Test suite for the [InternalFlavor] implementation. + */ +class InternalImageTest { + @Test + fun testEquality() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf()) + val b = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf()) + + assertEquals(a, b) + } + + @Test + fun testEqualityWithDifferentType() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf()) + + val b = mockk<Image>(relaxUnitFun = true) + every { b.uid } returns uid + + assertEquals(a, b) + } + + @Test + fun testInequalityWithDifferentType() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf()) + + val b = mockk<Image>(relaxUnitFun = true) + every { b.uid } returns UUID.randomUUID() + + assertNotEquals(a, b) + } + + @Test + fun testInequalityWithIncorrectType() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf()) + + assertNotEquals(a, Unit) + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt new file mode 100644 index 00000000..81cb45df --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt @@ -0,0 +1,285 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service + +import io.mockk.* +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.internal.InternalFlavor +import org.opendc.compute.service.internal.InternalImage +import org.opendc.compute.service.internal.InternalServer +import java.util.* + +/** + * Test suite for the [InternalServer] implementation. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class InternalServerTest { + @Test + fun testEquality() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + assertEquals(a, b) + } + + @Test + fun testEqualityWithDifferentType() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + val b = mockk<Server>(relaxUnitFun = true) + every { b.uid } returns uid + + assertEquals(a, b) + } + + @Test + fun testInequalityWithDifferentType() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + val b = mockk<Server>(relaxUnitFun = true) + every { b.uid } returns UUID.randomUUID() + + assertNotEquals(a, b) + } + + @Test + fun testInequalityWithIncorrectType() { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + assertNotEquals(a, Unit) + } + + @Test + fun testStartTerminatedServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) } + + server.start() + + verify(exactly = 1) { service.schedule(server) } + assertEquals(ServerState.PROVISIONING, server.state) + } + + @Test + fun testStartDeletedServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.DELETED + + assertThrows<IllegalStateException> { server.start() } + } + + @Test + fun testStartProvisioningServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.PROVISIONING + + server.start() + + assertEquals(ServerState.PROVISIONING, server.state) + } + + @Test + fun testStartRunningServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.RUNNING + + server.start() + + assertEquals(ServerState.RUNNING, server.state) + } + + @Test + fun testStopProvisioningServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + val request = ComputeServiceImpl.SchedulingRequest(server) + + every { service.schedule(any()) } returns request + + server.start() + server.stop() + + assertTrue(request.isCancelled) + assertEquals(ServerState.TERMINATED, server.state) + } + + @Test + fun testStopTerminatedServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.TERMINATED + server.stop() + + assertEquals(ServerState.TERMINATED, server.state) + } + + @Test + fun testStopDeletedServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.DELETED + server.stop() + + assertEquals(ServerState.DELETED, server.state) + } + + @Test + fun testStopRunningServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>() + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + val host = mockk<Host>(relaxUnitFun = true) + + server.state = ServerState.RUNNING + server.host = host + server.stop() + yield() + + coVerify { host.stop(server) } + } + + @Test + fun testDeleteProvisioningServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + val request = ComputeServiceImpl.SchedulingRequest(server) + + every { service.schedule(any()) } returns request + + server.start() + server.delete() + + assertTrue(request.isCancelled) + assertEquals(ServerState.DELETED, server.state) + verify { service.delete(server) } + } + + @Test + fun testDeleteTerminatedServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.TERMINATED + server.delete() + + assertEquals(ServerState.DELETED, server.state) + + verify { service.delete(server) } + } + + @Test + fun testDeleteDeletedServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + + server.state = ServerState.DELETED + server.delete() + + assertEquals(ServerState.DELETED, server.state) + } + + @Test + fun testDeleteRunningServer() = runBlockingTest { + val service = mockk<ComputeServiceImpl>(relaxUnitFun = true) + val uid = UUID.randomUUID() + val flavor = mockk<InternalFlavor>() + val image = mockk<InternalImage>() + val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf()) + val host = mockk<Host>(relaxUnitFun = true) + + server.state = ServerState.RUNNING + server.host = host + server.delete() + yield() + + coVerify { host.delete(server) } + verify { service.delete(server) } + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt new file mode 100644 index 00000000..db377914 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView +import java.util.* +import java.util.stream.Stream +import kotlin.random.Random + +/** + * Test suite for the [AllocationPolicy] interface. + */ +internal class AllocationPolicyTest { + @ParameterizedTest + @MethodSource("activeServersArgs") + fun testActiveServersPolicy( + reversed: Boolean, + hosts: Set<HostView>, + server: Server, + expectedHost: HostView? + ) { + val policy = NumberOfActiveServersAllocationPolicy(reversed) + assertEquals(expectedHost, policy.invoke().select(hosts, server)) + } + + @ParameterizedTest + @MethodSource("availableMemoryArgs") + fun testAvailableMemoryPolicy( + reversed: Boolean, + hosts: Set<HostView>, + server: Server, + expectedHost: HostView? + ) { + val policy = AvailableMemoryAllocationPolicy(reversed) + assertEquals(expectedHost, policy.invoke().select(hosts, server)) + } + + @ParameterizedTest + @MethodSource("availableCoreMemoryArgs") + fun testAvailableCoreMemoryPolicy( + reversed: Boolean, + hosts: Set<HostView>, + server: Server, + expectedHost: HostView? + ) { + val policy = AvailableMemoryAllocationPolicy(reversed) + assertEquals(expectedHost, policy.invoke().select(hosts, server)) + } + + @ParameterizedTest + @MethodSource("provisionedCoresArgs") + fun testProvisionedPolicy( + reversed: Boolean, + hosts: Set<HostView>, + server: Server, + expectedHost: HostView? + ) { + val policy = ProvisionedCoresAllocationPolicy(reversed) + assertEquals(expectedHost, policy.invoke().select(hosts, server)) + } + + @Suppress("unused") + private companion object { + /** + * Test arguments for the [NumberOfActiveServersAllocationPolicy]. + */ + @JvmStatic + fun activeServersArgs(): Stream<Arguments> { + val random = Random(1) + val hosts = List(4) { i -> + val view = mockk<HostView>() + every { view.host.uid } returns UUID(0, i.toLong()) + every { view.host.model.cpuCount } returns random.nextInt(1, 16) + every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024) + every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize) + every { view.numberOfActiveServers } returns random.nextInt(0, 6) + every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount) + every { view.toString() } returns "HostView[$i,numberOfActiveServers=${view.numberOfActiveServers}]" + view + } + + val servers = List(2) { + val server = mockk<Server>() + every { server.flavor.cpuCount } returns random.nextInt(1, 8) + every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512) + server + } + + return Stream.of( + Arguments.of(false, hosts.toSet(), servers[0], hosts[2]), + Arguments.of(false, hosts.toSet(), servers[1], hosts[1]), + Arguments.of(true, hosts.toSet(), servers[1], hosts[0]), + ) + } + + /** + * Test arguments for the [AvailableCoreMemoryAllocationPolicy]. + */ + @JvmStatic + fun availableCoreMemoryArgs(): Stream<Arguments> { + val random = Random(1) + val hosts = List(4) { i -> + val view = mockk<HostView>() + every { view.host.uid } returns UUID(0, i.toLong()) + every { view.host.model.cpuCount } returns random.nextInt(1, 16) + every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024) + every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize) + every { view.numberOfActiveServers } returns random.nextInt(0, 6) + every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount) + every { view.toString() } returns "HostView[$i,availableMemory=${view.availableMemory}]" + view + } + + val servers = List(2) { + val server = mockk<Server>() + every { server.flavor.cpuCount } returns random.nextInt(1, 8) + every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512) + server + } + + return Stream.of( + Arguments.of(false, hosts.toSet(), servers[0], hosts[2]), + Arguments.of(false, hosts.toSet(), servers[1], hosts[2]), + Arguments.of(true, hosts.toSet(), servers[1], hosts[1]), + ) + } + + /** + * Test arguments for the [AvailableMemoryAllocationPolicy]. + */ + @JvmStatic + fun availableMemoryArgs(): Stream<Arguments> { + val random = Random(1) + val hosts = List(4) { i -> + val view = mockk<HostView>() + every { view.host.uid } returns UUID(0, i.toLong()) + every { view.host.model.cpuCount } returns random.nextInt(1, 16) + every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024) + every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize) + every { view.numberOfActiveServers } returns random.nextInt(0, 6) + every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount) + every { view.toString() } returns "HostView[$i,availableMemory=${view.availableMemory}]" + view + } + + val servers = List(2) { + val server = mockk<Server>() + every { server.flavor.cpuCount } returns random.nextInt(1, 8) + every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512) + server + } + + return Stream.of( + Arguments.of(false, hosts.toSet(), servers[0], hosts[2]), + Arguments.of(false, hosts.toSet(), servers[1], hosts[2]), + Arguments.of(true, hosts.toSet(), servers[1], hosts[1]), + ) + } + + /** + * Test arguments for the [ProvisionedCoresAllocationPolicy]. + */ + @JvmStatic + fun provisionedCoresArgs(): Stream<Arguments> { + val random = Random(1) + val hosts = List(4) { i -> + val view = mockk<HostView>() + every { view.host.uid } returns UUID(0, i.toLong()) + every { view.host.model.cpuCount } returns random.nextInt(1, 16) + every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024) + every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize) + every { view.numberOfActiveServers } returns random.nextInt(0, 6) + every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount) + every { view.toString() } returns "HostView[$i,provisionedCores=${view.provisionedCores}]" + view + } + + val servers = List(2) { + val server = mockk<Server>() + every { server.flavor.cpuCount } returns random.nextInt(1, 8) + every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512) + server + } + + return Stream.of( + Arguments.of(false, hosts.toSet(), servers[0], hosts[2]), + Arguments.of(false, hosts.toSet(), servers[1], hosts[0]), + Arguments.of(true, hosts.toSet(), servers[1], hosts[0]), + ) + } + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml b/simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml new file mode 100644 index 00000000..0dfb75f2 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Copyright (c) 2021 AtLarge Research + ~ + ~ Permission is hereby granted, free of charge, to any person obtaining a copy + ~ of this software and associated documentation files (the "Software"), to deal + ~ in the Software without restriction, including without limitation the rights + ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + ~ copies of the Software, and to permit persons to whom the Software is + ~ furnished to do so, subject to the following conditions: + ~ + ~ The above copyright notice and this permission notice shall be included in all + ~ copies or substantial portions of the Software. + ~ + ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + ~ SOFTWARE. + --> + +<Configuration status="WARN" packages="org.apache.logging.log4j.core"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/> + </Console> + </Appenders> + <Loggers> + <Logger name="org.opendc" level="trace" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Root level="info"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts index 1ad3f1c6..3bf8a114 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -38,5 +38,6 @@ dependencies { implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) + testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk")) testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 3c4b4410..6d81aa7d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,8 +22,9 @@ package org.opendc.compute.simulator +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.common.Labels import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server @@ -36,7 +37,6 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.MachinePowerModel import org.opendc.simulator.failures.FailureDomain -import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* import kotlin.coroutines.CoroutineContext @@ -52,6 +52,7 @@ public class SimHost( override val meta: Map<String, Any>, context: CoroutineContext, clock: Clock, + meter: Meter, hypervisor: SimHypervisorProvider, powerModel: MachinePowerModel = ConstantPowerModel(0.0), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), @@ -59,17 +60,13 @@ public class SimHost( /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. */ - override val scope: CoroutineScope = CoroutineScope(context) + override val scope: CoroutineScope = CoroutineScope(context + Job()) /** * The logger instance of this server. */ private val logger = KotlinLogging.logger {} - override val events: Flow<HostEvent> - get() = _events - internal val _events = EventFlow<HostEvent>() - /** * The event listeners registered with this host. */ @@ -99,18 +96,13 @@ public class SimHost( cpuUsage: Double, cpuDemand: Double ) { - _events.emit( - HostEvent.SliceFinished( - this@SimHost, - requestedWork, - grantedWork, - overcommittedWork, - interferedWork, - cpuUsage, - cpuDemand, - guests.size - ) - ) + _cpuWork.record(requestedWork.toDouble()) + _cpuWorkGranted.record(grantedWork.toDouble()) + _cpuWorkOvercommit.record(overcommittedWork.toDouble()) + _cpuWorkInterference.record(interferedWork.toDouble()) + _cpuUsage.record(cpuUsage) + _cpuDemand.record(cpuDemand) + _cpuPower.record(machine.powerDraw.value) } } ) @@ -132,6 +124,87 @@ public class SimHost( override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum()) + /** + * The number of guests on the host. + */ + private val _guests = meter.longUpDownCounterBuilder("guests.total") + .setDescription("Number of guests") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The number of active guests on the host. + */ + private val _activeGuests = meter.longUpDownCounterBuilder("guests.active") + .setDescription("Number of active guests") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The CPU usage on the host. + */ + private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage") + .setDescription("The amount of CPU resources used by the host") + .setUnit("MHz") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The CPU demand on the host. + */ + private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand") + .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + .setUnit("MHz") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The requested work for the CPU. + */ + private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage") + .setDescription("The amount of power used by the CPU") + .setUnit("W") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The requested work for the CPU. + */ + private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total") + .setDescription("The amount of work supplied to the CPU") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work actually performed by the CPU. + */ + private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted") + .setDescription("The amount of work performed by the CPU") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work that could not be performed by the CPU due to overcommitting resource. + */ + private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit") + .setDescription("The amount of work not performed by the CPU due to overcommitment") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work that could not be performed by the CPU due to interference. + */ + private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference") + .setDescription("The amount of work not performed by the CPU due to interference") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + init { // Launch hypervisor onto machine scope.launch { @@ -166,12 +239,11 @@ public class SimHost( require(canFit(server)) { "Server does not fit" } val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) guests[server] = guest + _guests.add(1) if (start) { guest.start() } - - _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) } override fun contains(server: Server): Boolean { @@ -191,6 +263,7 @@ public class SimHost( override suspend fun delete(server: Server) { val guest = guests.remove(server) ?: return guest.terminate() + _guests.add(-1) } override fun addListener(listener: HostListener) { @@ -207,6 +280,8 @@ public class SimHost( _state = HostState.DOWN } + override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" + /** * Convert flavor to machine model. */ @@ -226,6 +301,7 @@ public class SimHost( } } + _activeGuests.add(1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } } @@ -236,9 +312,8 @@ public class SimHost( } } + _activeGuests.add(-1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - - _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) } override suspend fun fail() { diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index e311cd21..830fc868 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,12 +22,14 @@ package org.opendc.compute.simulator -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.* +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -37,7 +39,8 @@ import org.opendc.compute.api.Image import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.api.ServerWatcher -import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -45,23 +48,20 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.time.Clock +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.sdk.toOtelClock import java.util.UUID +import kotlin.coroutines.resume /** * Basic test-suite for the hypervisor. */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimHostTest { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock private lateinit var machineModel: SimMachineModel @BeforeEach fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( @@ -74,72 +74,98 @@ internal class SimHostTest { * Test overcommitting of resources by the hypervisor. */ @Test - fun testOvercommitted() { + fun testOvercommitted() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) var requestedWork = 0L var grantedWork = 0L var overcommittedWork = 0L - scope.launch { - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider()) - val duration = 5 * 60L - val vmImageA = MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) - ), - ) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val duration = 5 * 60L + val vmImageA = MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) + ), ) ) - val vmImageB = MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) - ) + ) + val vmImageB = MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) ) ) ) + ) - delay(5) - - val flavor = MockFlavor(2, 0) - virtDriver.events - .onEach { event -> - when (event) { - is HostEvent.SliceFinished -> { - requestedWork += event.requestedBurst - grantedWork += event.grantedBurst - overcommittedWork += event.overcommissionedBurst - } - } + val flavor = MockFlavor(2, 0) + + // Setup metric reader + val reader = CoroutineMetricReader( + this, listOf(meterProvider as MetricProducer), + object : MetricExporter { + override fun export(metrics: Collection<MetricData>): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong() + grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong() + overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong() + return CompletableResultCode.ofSuccess() } - .launchIn(this) + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + }, + exportInterval = duration * 1000 + ) + + coroutineScope { launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } + + suspendCancellableCoroutine<Unit> { cont -> + virtDriver.addListener(object : HostListener { + private var finished = 0 + + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED && ++finished == 2) { + cont.resume(Unit) + } + } + }) + } } - scope.advanceUntilIdle() + // Ensure last cycle is collected + delay(1000 * duration) + virtDriver.close() + reader.close() assertAll( - { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, { assertEquals(4197600, requestedWork, "Requested work does not match") }, { assertEquals(2157600, grantedWork, "Granted work does not match") }, { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, - { assertEquals(1200006, scope.currentTime) } + { assertEquals(1500001, currentTime) } ) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 2d0da1bf..b2d7cc30 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -47,4 +47,6 @@ dependencies { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } + + implementation(project(":opendc-telemetry:opendc-telemetry-sdk")) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 44436019..40f50235 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -22,26 +22,19 @@ package org.opendc.experiments.capelin -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostEvent import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimHost +import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader @@ -51,9 +44,11 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector -import org.opendc.trace.core.EventTracer +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Clock +import kotlin.coroutines.coroutineContext +import kotlin.coroutines.resume import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -136,13 +131,13 @@ public fun createTraceReader( /** * Construct the environment for a simulated compute service.. */ -public fun createComputeService( - coroutineScope: CoroutineScope, +public suspend fun withComputeService( clock: Clock, + meterProvider: MeterProvider, environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, - eventTracer: EventTracer -): ComputeServiceImpl { + block: suspend CoroutineScope.(ComputeService) -> Unit +): Unit = coroutineScope { val hosts = environmentReader .use { it.read() } .map { def -> @@ -151,33 +146,43 @@ public fun createComputeService( def.name, def.model, def.meta, - coroutineScope.coroutineContext, + coroutineContext, clock, + meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider(), def.powerModel ) } + val schedulerMeter = meterProvider.get("opendc-compute") val scheduler = - ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy) for (host in hosts) { scheduler.addHost(host) } - return scheduler + try { + block(this, scheduler) + } finally { + scheduler.close() + hosts.forEach(SimHost::close) + } } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public fun attachMonitor( - coroutineScope: CoroutineScope, +public suspend fun withMonitor( + monitor: ExperimentMonitor, clock: Clock, + metricProducer: MetricProducer, scheduler: ComputeService, - monitor: ExperimentMonitor -) { + block: suspend CoroutineScope.() -> Unit +): Unit = coroutineScope { + val monitorJobs = mutableSetOf<Job>() + // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -186,45 +191,55 @@ public fun attachMonitor( monitor.reportHostStateChange(clock.millis(), host, newState) } }) + } - host.events - .onEach { event -> - when (event) { - is HostEvent.SliceFinished -> monitor.reportHostSlice( - clock.millis(), - event.requestedBurst, - event.grantedBurst, - event.overcommissionedBurst, - event.interferedBurst, - event.cpuUsage, - event.cpuDemand, - event.numberOfDeployedImages, - event.driver - ) - } - } - .launchIn(coroutineScope) + val reader = CoroutineMetricReader( + this, + listOf(metricProducer), + ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }), + exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */ + ) - (host as SimHost).machine.powerDraw - .onEach { monitor.reportPowerConsumption(host, it) } - .launchIn(coroutineScope) + try { + block(this) + } finally { + monitorJobs.forEach(Job::cancel) + reader.close() + monitor.close() } +} - scheduler.events - .onEach { event -> - when (event) { - is ComputeServiceEvent.MetricsAvailable -> - monitor.reportProvisionerMetrics(clock.millis(), event) - } - } - .launchIn(coroutineScope) +public class ComputeMetrics { + public var submittedVms: Int = 0 + public var queuedVms: Int = 0 + public var runningVms: Int = 0 + public var unscheduledVms: Int = 0 + public var finishedVms: Int = 0 +} + +/** + * Collect the metrics of the compute service. + */ +public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { + val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + val res = ComputeMetrics() + try { + // Hack to extract metrics from OpenTelemetry SDK + res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + } catch (cause: Throwable) { + logger.warn(cause) { "Failed to collect metrics" } + } + return res } /** * Process the trace. */ public suspend fun processTrace( - coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<SimWorkload>, scheduler: ComputeService, @@ -233,44 +248,46 @@ public suspend fun processTrace( ) { val client = scheduler.newClient() val image = client.newImage("vm-image") + var offset = Long.MIN_VALUE try { - var submitted = 0 + coroutineScope { + while (reader.hasNext()) { + val entry = reader.next() - while (reader.hasNext()) { - val entry = reader.next() + if (offset < 0) { + offset = entry.start - clock.millis() + } - submitted++ - delay(max(0, entry.start - clock.millis())) - coroutineScope.launch { - chan.send(Unit) - val server = client.newServer( - entry.name, - image, - client.newFlavor( + delay(max(0, (entry.start - offset) - clock.millis())) + launch { + chan.send(Unit) + val server = client.newServer( entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta - ) + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta + ) - server.watch(object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor.reportVmStateChange(clock.millis(), server, newState) + suspendCancellableCoroutine { cont -> + server.watch(object : ServerWatcher { + override fun onStateChanged(server: Server, newState: ServerState) { + monitor.reportVmStateChange(clock.millis(), server, newState) + + if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) { + cont.resume(Unit) + } + } + }) } - }) + } } } - scheduler.events - .takeWhile { - when (it) { - is ComputeServiceEvent.MetricsAvailable -> - it.inactiveVmCount + it.failedVmCount != submitted - } - } - .collect() - delay(1) + yield() } finally { reader.close() client.close() diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index f9c96bb6..5fa77161 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -22,19 +22,15 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging -import org.opendc.compute.service.scheduler.AllocationPolicy -import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy -import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy -import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy -import org.opendc.compute.service.scheduler.RandomAllocationPolicy -import org.opendc.compute.simulator.allocation.* +import org.opendc.compute.service.scheduler.* import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -47,7 +43,7 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer +import org.opendc.telemetry.sdk.toOtelClock import java.io.File import java.util.concurrent.ConcurrentHashMap import kotlin.random.Random @@ -117,16 +113,19 @@ public abstract class Portfolio(name: String) : Experiment(name) { * Perform a single trial for this portfolio. */ @OptIn(ExperimentalCoroutinesApi::class) - override fun doRun(repeat: Int) { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) + override fun doRun(repeat: Int): Unit = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seeder = Random(repeat) val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = createAllocationPolicy(seeder) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } @@ -152,15 +151,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { 4096 ) - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environment, - allocationPolicy, - tracer - ) - + withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> val failureDomain = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( @@ -175,31 +166,21 @@ public abstract class Portfolio(name: String) : Experiment(name) { null } - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - trace, - scheduler, - chan, - monitor - ) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + trace, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() } - try { - testScope.advanceUntilIdle() - } finally { - monitor.close() - } + val monitorResults = collectMetrics(meterProvider as MetricProducer) + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } } /** diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt new file mode 100644 index 00000000..799de60f --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.monitor + +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import org.opendc.compute.service.driver.Host +import java.time.Clock + +/** + * A [MetricExporter] that exports the metrics to the [ExperimentMonitor]. + */ +public class ExperimentMetricExporter( + private val monitor: ExperimentMonitor, + private val clock: Clock, + private val hosts: Map<String, Host> +) : MetricExporter { + override fun export(metrics: Collection<MetricData>): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + reportHostMetrics(metricsByName) + reportProvisionerMetrics(metricsByName) + return CompletableResultCode.ofSuccess() + } + + private fun reportHostMetrics(metrics: Map<String, MetricData>) { + val hostMetrics = mutableMapOf<String, HostMetrics>() + hosts.mapValuesTo(hostMetrics) { HostMetrics() } + + mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> + m.cpuDemand = v + } + + mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> + m.cpuUsage = v + } + + mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v -> + m.powerDraw = v + } + + mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v -> + m.requestedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v -> + m.grantedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> + m.overcommissionedBurst = v.toLong() + } + + mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v -> + m.interferedBurst = v.toLong() + } + + mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> + m.numberOfDeployedImages = v.toInt() + } + + for ((id, hostMetric) in hostMetrics) { + val host = hosts.getValue(id) + monitor.reportHostSlice( + clock.millis(), + hostMetric.requestedBurst, + hostMetric.grantedBurst, + hostMetric.overcommissionedBurst, + hostMetric.interferedBurst, + hostMetric.cpuUsage, + hostMetric.cpuDemand, + hostMetric.numberOfDeployedImages, + host + ) + + monitor.reportPowerConsumption(host, hostMetric.powerDraw) + } + } + + private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleSummaryData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.sum) + } + } + } + + private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleSumData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + + private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) { + val points = data?.longSumData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + + private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) { + val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + + monitor.reportProvisionerMetrics( + clock.millis(), + hosts, + availableHosts, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + } + + private class HostMetrics { + var requestedBurst: Long = 0 + var grantedBurst: Long = 0 + var overcommissionedBurst: Long = 0 + var interferedBurst: Long = 0 + var cpuUsage: Double = 0.0 + var cpuDemand: Double = 0.0 + var numberOfDeployedImages: Int = 0 + var powerDraw: Double = 0.0 + } + + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() +} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 14cc06dc..5e75c890 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -24,15 +24,13 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState -import java.io.Closeable /** * A monitor watches the events of an experiment. */ -public interface ExperimentMonitor : Closeable { +public interface ExperimentMonitor : AutoCloseable { /** * This method is invoked when the state of a VM changes. */ @@ -68,5 +66,14 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked for a provisioner event. */ - public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {} + public fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index c9d57a98..0e675d87 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -25,7 +25,6 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.telemetry.HostEvent @@ -172,17 +171,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } } - override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { + override fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) { provisionerWriter.write( ProvisionerEvent( time, - event.totalHostCount, - event.availableHostCount, - event.totalVmCount, - event.activeVmCount, - event.inactiveVmCount, - event.waitingVmCount, - event.failedVmCount + totalHostCount, + availableHostCount, + totalVmCount, + activeVmCount, + inactiveVmCount, + waitingVmCount, + failedVmCount ) ) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index a812490a..02cfdc06 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,19 +22,18 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.yield -import org.junit.jupiter.api.AfterEach +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -45,9 +44,8 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer +import org.opendc.telemetry.sdk.toOtelClock import java.io.File -import java.time.Clock /** * An integration test suite for the SC20 experiments. @@ -55,16 +53,6 @@ import java.time.Clock @OptIn(ExperimentalCoroutinesApi::class) class CapelinIntegrationTest { /** - * The [TestCoroutineScope] to use. - */ - private lateinit var testScope: TestCoroutineScope - - /** - * The simulation clock to use. - */ - private lateinit var clock: Clock - - /** * The monitor used to keep track of the metrics. */ private lateinit var monitor: TestExperimentReporter @@ -74,38 +62,26 @@ class CapelinIntegrationTest { */ @BeforeEach fun setUp() { - testScope = TestCoroutineScope() - clock = DelayControllerClockAdapter(testScope) - monitor = TestExperimentReporter() } - /** - * Tear down the experimental environment. - */ - @AfterEach - fun tearDown() = testScope.cleanupTestCoroutines() - @Test - fun testLarge() { + fun testLarge() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val failures = false val seed = 0 val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: ComputeServiceImpl - val tracer = EventTracer(clock) - - testScope.launch { - scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy, - tracer - ) + lateinit var monitorResults: ComputeMetrics + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( @@ -120,29 +96,28 @@ class CapelinIntegrationTest { null } - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - traceReader, - scheduler, - chan, - monitor - ) - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() - monitor.close() } - runSimulation() + monitorResults = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") // Note that these values have been verified beforehand assertAll( - { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, + { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, + { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, + { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, + { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, @@ -151,41 +126,33 @@ class CapelinIntegrationTest { } @Test - fun testSmall() { + fun testSmall() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seed = 1 val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - val tracer = EventTracer(clock) - - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy, - tracer - ) - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - traceReader, - scheduler, - chan, - monitor - ) - - yield() - - println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - - scheduler.close() - monitor.close() + + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } } - runSimulation() + val metrics = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") // Note that these values have been verified beforehand assertAll( @@ -197,11 +164,6 @@ class CapelinIntegrationTest { } /** - * Run the simulation. - */ - private fun runSimulation() = testScope.advanceUntilIdle() - - /** * Obtain the trace reader for the test. */ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore b/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore deleted file mode 100644 index ba64707c..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -input/ -output/ diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt deleted file mode 100644 index 9e305b3d..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc18 - -import kotlinx.coroutines.* -import kotlinx.coroutines.test.TestCoroutineScope -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.simulator.SimHost -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.harness.dsl.Experiment -import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer -import org.opendc.trace.core.enable -import org.opendc.workflow.service.WorkflowEvent -import org.opendc.workflow.service.WorkflowService -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode -import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy -import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy -import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import java.io.File -import java.io.FileInputStream -import kotlin.math.max - -/** - * The [UnderspecificationExperiment] investigates the impact of scheduler underspecification on performance. - * It focuses on components that must exist (that is, based on their own publications, the correct operation of the - * schedulers under study requires these components), yet have been left underspecified by their author. - */ -public class UnderspecificationExperiment : Experiment("underspecification") { - /** - * The workflow traces to test. - */ - private val trace: String by anyOf("input/traces/chronos_exp_noscaler_ca.gwf") - - /** - * The datacenter environments to test. - */ - private val environment: String by anyOf("input/environments/base.json") - - @OptIn(ExperimentalCoroutinesApi::class) - override fun doRun(repeat: Int) { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) - val recording = tracer.openRecording().run { - enable<WorkflowEvent.JobSubmitted>() - enable<WorkflowEvent.JobStarted>() - enable<WorkflowEvent.JobFinished>() - enable<WorkflowEvent.TaskStarted>() - enable<WorkflowEvent.TaskFinished>() - this - } - - testScope.launch { - launch { println("MAKESPAN: ${recording.workflowRuntime()}") } - launch { println("WAIT: ${recording.workflowWaitingTime()}") } - recording.start() - } - - testScope.launch { - val hosts = Sc18EnvironmentReader(FileInputStream(File(environment))) - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - testScope.coroutineContext, - clock, - SimSpaceSharedHypervisorProvider() - ) - } - - val compute = ComputeService( - testScope.coroutineContext, - clock, - tracer, - NumberOfActiveServersAllocationPolicy(), - ) - - hosts.forEach { compute.addHost(it) } - - val scheduler = WorkflowService( - testScope.coroutineContext, - clock, - tracer, - compute.newClient(), - mode = WorkflowSchedulerMode.Batch(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - - val reader = GwfTraceReader(File(trace)) - - while (reader.hasNext()) { - val entry = reader.next() - delay(max(0, entry.start * 1000 - clock.millis())) - scheduler.submit(entry.workload) - } - } - - testScope.advanceUntilIdle() - recording.close() - - // Check whether everything went okay - testScope.uncaughtExceptions.forEach { it.printStackTrace() } - assert(testScope.uncaughtExceptions.isEmpty()) { "Errors occurred during execution of the experiment" } - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt deleted file mode 100644 index a8356888..00000000 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.sc18 - -import org.opendc.trace.core.EventStream -import org.opendc.trace.core.onEvent -import org.opendc.workflow.service.WorkflowEvent -import java.util.* -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine - -/** - * This function collects the makespan of workflows that appear in the event stream. - */ -public suspend fun EventStream.workflowRuntime(): Map<UUID, Long> = suspendCoroutine { cont -> - val starts = mutableMapOf<UUID, Long>() - val results = mutableMapOf<UUID, Long>() - - onEvent<WorkflowEvent.JobStarted> { - starts[it.job.uid] = it.timestamp - } - onEvent<WorkflowEvent.JobFinished> { - val start = starts.remove(it.job.uid) ?: return@onEvent - results[it.job.uid] = it.timestamp - start - } - onClose { cont.resume(results) } -} - -/** - * This function collects the waiting time of workflows that appear in the event stream, which the duration between the - * workflow submission and the start of the first task. - */ -public suspend fun EventStream.workflowWaitingTime(): Map<UUID, Long> = suspendCoroutine { cont -> - val starts = mutableMapOf<UUID, Long>() - val results = mutableMapOf<UUID, Long>() - - onEvent<WorkflowEvent.JobStarted> { - starts[it.job.uid] = it.timestamp - } - onEvent<WorkflowEvent.TaskStarted> { - results.computeIfAbsent(it.job.uid) { _ -> - val start = starts.remove(it.job.uid)!! - it.timestamp - start - } - } - onClose { cont.resume(results) } -} - -/** - * This function collects the response time of tasks that appear in the event stream. - */ -public suspend fun EventStream.taskResponse(): Map<UUID, Long> = suspendCoroutine { cont -> - val starts = mutableMapOf<UUID, Long>() - val results = mutableMapOf<UUID, Long>() - - onEvent<WorkflowEvent.JobSubmitted> { - for (task in it.job.tasks) { - starts[task.uid] = it.timestamp - } - } - onEvent<WorkflowEvent.TaskFinished> { - val start = starts.remove(it.job.uid) ?: return@onEvent - results[it.task.uid] = it.timestamp - start - } - onClose { cont.resume(results) } -} diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts index d07fe7a6..fcc78a83 100644 --- a/simulator/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc-runner-web/build.gradle.kts @@ -48,4 +48,6 @@ dependencies { runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}") runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}") + + implementation(project(":opendc-telemetry:opendc-telemetry-sdk")) } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index b9aeecb8..5b717ff7 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -34,9 +34,12 @@ import com.mongodb.client.MongoClients import com.mongodb.client.MongoCollection import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.bson.Document import org.bson.types.ObjectId @@ -45,19 +48,14 @@ import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy import org.opendc.compute.service.scheduler.RandomAllocationPolicy -import org.opendc.compute.simulator.allocation.* -import org.opendc.experiments.capelin.attachMonitor -import org.opendc.experiments.capelin.createComputeService -import org.opendc.experiments.capelin.createFailureDomain +import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.processTrace import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer +import org.opendc.telemetry.sdk.toOtelClock import java.io.File -import kotlin.coroutines.coroutineContext import kotlin.random.Random private val logger = KotlinLogging.logger {} @@ -208,93 +206,85 @@ public class RunnerCli : CliktCommand(name = "runner") { traceReader: Sc20RawParquetTraceReader, performanceInterferenceReader: Sc20PerformanceInterferenceReader? ): WebExperimentMonitor.Result { - val seed = repeat - val traceDocument = scenario.get("trace", Document::class.java) - val workloadName = traceDocument.getString("traceId") - val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() - - val seeder = Random(seed) - val testScope = TestCoroutineScope(Job(parent = coroutineContext[Job])) - val clock = DelayControllerClockAdapter(testScope) - - val chan = Channel<Unit>(Channel.CONFLATED) - - val operational = scenario.get("operational", Document::class.java) - val allocationPolicy = - when (val policyName = operational.getString("schedulerName")) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - else -> throw IllegalArgumentException("Unknown policy $policyName") - } - - val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader( - listOf(traceReader), - performanceInterferenceModel, - Workload(workloadName, workloadFraction), - seed - ) - val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) - val environment = TopologyParser(topologies, topologyId) val monitor = WebExperimentMonitor() - val tracer = EventTracer(clock) - - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environment, - allocationPolicy, - tracer - ) - - val failureDomain = if (operational.getBoolean("failuresEnabled")) { - logger.debug("ENABLING failures") - createFailureDomain( - testScope, - clock, - seeder.nextInt(), - operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, - scheduler, - chan + + try { + runBlockingTest { + val seed = repeat + val traceDocument = scenario.get("trace", Document::class.java) + val workloadName = traceDocument.getString("traceId") + val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() + + val seeder = Random(seed) + val clock = DelayControllerClockAdapter(this) + + val chan = Channel<Unit>(Channel.CONFLATED) + + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val metricProducer = meterProvider as MetricProducer + + val operational = scenario.get("operational", Document::class.java) + val allocationPolicy = + when (val policyName = operational.getString("schedulerName")) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + else -> throw IllegalArgumentException("Unknown policy $policyName") + } + + val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader( + listOf(traceReader), + performanceInterferenceModel, + Workload(workloadName, workloadFraction), + seed ) - } else { - null - } + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) + val environment = TopologyParser(topologies, topologyId) + val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7 + + withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> + val failureDomain = if (failureFrequency > 0) { + logger.debug { "ENABLING failures" } + createFailureDomain( + this, + clock, + seeder.nextInt(), + failureFrequency, + scheduler, + chan + ) + } else { + null + } - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - trace, - scheduler, - chan, - monitor - ) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.close() - } + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + trace, + scheduler, + chan, + monitor + ) + } - try { - testScope.advanceUntilIdle() - testScope.uncaughtExceptions.forEach { it.printStackTrace() } - } finally { - monitor.close() - testScope.cancel() + failureDomain?.cancel() + } + + val monitorResults = collectMetrics(metricProducer) + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } + } + } catch (cause: Throwable) { + logger.warn(cause) { "Experiment failed" } } return monitor.getResult() diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index a8ac6c10..fcd43ea7 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -25,7 +25,6 @@ package org.opendc.runner.web import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -205,13 +204,22 @@ public class WebExperimentMonitor : ExperimentMonitor { private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() - override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { + override fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) { provisionerMetrics = AggregateProvisionerMetrics( - max(event.totalVmCount, provisionerMetrics.vmTotalCount), - max(event.waitingVmCount, provisionerMetrics.vmWaitingCount), - max(event.activeVmCount, provisionerMetrics.vmActiveCount), - max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount), - max(event.failedVmCount, provisionerMetrics.vmFailedCount), + max(totalVmCount, provisionerMetrics.vmTotalCount), + max(waitingVmCount, provisionerMetrics.vmWaitingCount), + max(activeVmCount, provisionerMetrics.vmActiveCount), + max(inactiveVmCount, provisionerMetrics.vmInactiveCount), + max(failedVmCount, provisionerMetrics.vmFailedCount), ) } diff --git a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts index bcb08be7..0221829a 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts +++ b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts @@ -32,7 +32,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-serverless:opendc-serverless-api")) - api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 1c0f94fd..2127b066 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -108,8 +108,11 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine .launchIn(this) launch { - source.consume(consumer) - job.cancel() + try { + source.consume(consumer) + } finally { + job.cancel() + } } } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index f86c4198..830ff70e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -71,6 +71,7 @@ public class SimBareMetalMachine( override fun close() { super.close() + scheduler.close() scope.cancel() } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index 937b6966..f2eea97c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -71,7 +71,7 @@ class SimResourceBenchmarks { fun benchmarkForwardOverhead(state: Workload) { return scope.runBlockingTest { val provider = SimResourceSource(4200.0, clock, scheduler) - val forwarder = SimResourceTransformer() + val forwarder = SimResourceForwarder() provider.startConsumer(forwarder) return@runBlockingTest forwarder.consume(state.consumers[0]) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 9705bd17..5c5ee038 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -54,8 +54,17 @@ public abstract class SimAbstractResourceContext( override val remainingWork: Double get() { val activeCommand = activeCommand ?: return 0.0 - return computeRemainingWork(activeCommand, clock.millis()) + val now = clock.millis() + + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + computeRemainingWork(activeCommand, now).also { _remainingWork = it } + } else { + _remainingWork + } } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE /** * A flag to indicate the state of the context. @@ -201,6 +210,9 @@ public abstract class SimAbstractResourceContext( // Flush may not be called when the resource consumer has finished throw IllegalStateException() } + + // Flush remaining work cache + _remainingWorkFlush = Long.MIN_VALUE } catch (cause: Throwable) { doStop(cause) } finally { diff --git a/simulator/opendc-trace/build.gradle.kts b/simulator/opendc-telemetry/build.gradle.kts index a1a751a2..7edfd134 100644 --- a/simulator/opendc-trace/build.gradle.kts +++ b/simulator/opendc-telemetry/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/simulator/opendc-trace/opendc-trace-core/build.gradle.kts b/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts index 3051f733..d9a4b4dd 100644 --- a/simulator/opendc-trace/opendc-trace-core/build.gradle.kts +++ b/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -description = "Event tracing library for OpenDC" +description = "Telemetry API for OpenDC" /* Build configuration */ plugins { @@ -29,5 +29,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api("org.jetbrains.kotlinx:kotlinx-coroutines-core") + api("io.opentelemetry:opentelemetry-api:${versions.otelApi}") + api("io.opentelemetry:opentelemetry-api-metrics:${versions.otelApiMetrics}") } diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts index 02e77c7c..350a0f74 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -description = "Experiments for the SC18 article" +description = "Telemetry SDK for OpenDC" /* Build configuration */ plugins { `kotlin-library-conventions` - `experiment-conventions` } dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-harness")) - implementation(project(":opendc-format")) - implementation(project(":opendc-workflow:opendc-workflow-service")) - implementation(project(":opendc-simulator:opendc-simulator-core")) - implementation(project(":opendc-compute:opendc-compute-simulator")) + api(project(":opendc-telemetry:opendc-telemetry-api")) + api("org.jetbrains.kotlinx:kotlinx-coroutines-core") + api("io.opentelemetry:opentelemetry-sdk:${versions.otelSdk}") + api("io.opentelemetry:opentelemetry-sdk-metrics:${versions.otelSdkMetrics}") + + implementation("io.github.microutils:kotlin-logging") } diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt index 1f4bb267..86f6647e 100644 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt +++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,15 +20,20 @@ * SOFTWARE. */ -package org.opendc.trace.core +package org.opendc.telemetry.sdk + +import io.opentelemetry.sdk.common.Clock /** - * Base class for events reported by the OpenDC tracing library. + * An adapter class that bridges a [java.time.Clock] to a [Clock] */ -public abstract class Event(timestamp: Long = Long.MIN_VALUE) { - /** - * The timestamp at which the event has occurred. - */ - public var timestamp: Long = timestamp - internal set +public class OtelClockAdapter(private val clock: java.time.Clock) : Clock { + override fun now(): Long = clock.millis() + + override fun nanoTime(): Long = clock.millis() * 1_000_000L } + +/** + * Convert the specified [java.time.Clock] to a [Clock]. + */ +public fun java.time.Clock.toOtelClock(): Clock = OtelClockAdapter(this) diff --git a/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt new file mode 100644 index 00000000..9ee16fac --- /dev/null +++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.sdk.metrics.export + +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import mu.KotlinLogging +import java.util.* +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +/** + * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every + * export interval. + * + * The reader runs in a [CoroutineScope] which enables collection of metrics in environments with a custom clock. + * + * @param scope The [CoroutineScope] to run the reader in. + * @param producers The metric producers to gather metrics from. + * @param exporter The export to export the metrics to. + * @param exportInterval The export interval in milliseconds. + */ +public class CoroutineMetricReader( + scope: CoroutineScope, + private val producers: List<MetricProducer>, + private val exporter: MetricExporter, + private val exportInterval: Long = 60_000 +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS) + + /** + * The metric reader job. + */ + private val readerJob = scope.launch { + while (isActive) { + delay(exportInterval) + + val metrics = mutableListOf<MetricData>() + for (producer in producers) { + metrics.addAll(producer.collectAllMetrics()) + } + chan.send(Collections.unmodifiableList(metrics)) + } + } + + /** + * The exporter job runs in the background to actually export the metrics. + */ + private val exporterJob = chan.consumeAsFlow() + .onEach { metrics -> + suspendCoroutine<Unit> { cont -> + try { + val result = exporter.export(metrics) + result.whenComplete { + if (!result.isSuccess) { + logger.trace { "Exporter failed" } + } + cont.resume(Unit) + } + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } + cont.resume(Unit) + } + } + } + .launchIn(scope) + + override fun close() { + readerJob.cancel() + exporterJob.cancel() + } +} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt deleted file mode 100644 index ac2b5e9b..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core - -/** - * A stream of [Event]s. - */ -public interface EventStream : AutoCloseable { - /** - * Register the specified [action] to be performed on every event in the stream. - */ - public fun onEvent(action: (Event) -> Unit) - - /** - * Register the specified [action] to be performed on events of type [E]. - */ - public fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit) - - /** - * Register the specified [action] to be performed on errors. - */ - public fun onError(action: (Throwable) -> Unit) - - /** - * Register the specified [action] to be performed when the stream is closed. - */ - public fun onClose(action: Runnable) - - /** - * Unregister the specified [action]. - * - * @return `true` if an action was unregistered, `false` otherwise. - */ - public fun remove(action: Any): Boolean - - /** - * Start the processing of events in the current coroutine. - * - * @throws IllegalStateException if the stream was already started. - */ - public suspend fun start() - - /** - * Release all resources associated with this stream. - * - * @throws IllegalStateException if the stream was already stopped. - */ - public override fun close() -} - -/** - * Register the specified [action] to be performed on events of type [E]. - */ -public inline fun <reified E : Event> EventStream.onEvent(noinline action: (E) -> Unit) { - onEvent(E::class.java, action) -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt deleted file mode 100644 index 4f978f4f..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core - -import org.opendc.trace.core.internal.EventTracerImpl -import java.time.Clock - -/** - * An [EventTracer] is responsible for recording the events that occur in a system. - */ -public interface EventTracer : AutoCloseable { - /** - * The [Clock] used to measure the timestamp and duration of the events. - */ - public val clock: Clock - - /** - * Determine whether the specified [Event] class is currently enabled in any of the active recordings. - * - * @return `true` if the event is enabled, `false` otherwise. - */ - public fun isEnabled(type: Class<out Event>): Boolean - - /** - * Commit the specified [event] to the appropriate event streams. - */ - public fun commit(event: Event) - - /** - * Create a new [RecordingStream] which is able to actively capture events emitted to the [EventTracer]. - */ - public fun openRecording(): RecordingStream - - /** - * Terminate the lifecycle of the [EventTracer] and close its associated event streams. - */ - public override fun close() - - public companion object { - /** - * Construct a new [EventTracer] instance. - * - * @param clock The [Clock] used to measure the timestamps. - */ - @JvmName("create") - public operator fun invoke(clock: Clock): EventTracer = EventTracerImpl(clock) - } -} - -/** - * Determine whether the [Event] of type [E] is currently enabled in any of the active recordings. - * - * @return `true` if the event is enabled, `false` otherwise. - */ -public inline fun <reified E : Event> EventTracer.isEnabled(): Boolean = isEnabled(E::class.java) - -/** - * Lazily construct an [Event] of type [E] if it is enabled and commit it to the appropriate event streams. - */ -public inline fun <reified E : Event> EventTracer.commit(block: () -> E) { - if (isEnabled<E>()) { - commit(block()) - } -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt deleted file mode 100644 index 84dcc61a..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core - -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.channels.sendBlocking -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.callbackFlow - -/** - * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public fun EventStream.asFlow(): Flow<Event> = callbackFlow { - onEvent { sendBlocking(it) } - onError { cancel(CancellationException("API error", it)) } - onClose { channel.close() } - awaitClose { this@asFlow.close() } -} - -/** - * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public fun EventStream.consumeAsFlow(): Flow<Event> = callbackFlow { - onEvent { sendBlocking(it) } - onError { cancel(CancellationException("API error", it)) } - start() -} - -/** - * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public inline fun <reified E : Event> EventStream.asTypedFlow(): Flow<E> = callbackFlow { - onEvent<E> { sendBlocking(it) } - onError { cancel(CancellationException("API error", it)) } - onClose { channel.close() } - awaitClose { this@asTypedFlow.close() } -} - -/** - * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public inline fun <reified E : Event> EventStream.consumeAsTypedFlow(): Flow<E> = callbackFlow { - onEvent<E> { sendBlocking(it) } - onError { cancel(CancellationException("API error", it)) } - start() -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt deleted file mode 100644 index f49e7c49..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core - -/** - * A recording stream that produces events from an [EventTracer]. - */ -public interface RecordingStream : EventStream { - /** - * Enable recording of the specified event [type]. - */ - public fun enable(type: Class<out Event>) - - /** - * Disable recording of the specified event [type] - */ - public fun disable(type: Class<out Event>) -} - -/** - * Enable recording of events of type [E]. - */ -public inline fun <reified E : Event> RecordingStream.enable() { - enable(E::class.java) -} - -/** - * Disable recording of events of type [E]. - */ -public inline fun <reified E : Event> RecordingStream.disable() { - enable(E::class.java) -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt deleted file mode 100644 index fac99664..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core.internal - -import kotlinx.coroutines.suspendCancellableCoroutine -import org.opendc.trace.core.Event -import org.opendc.trace.core.EventStream -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume - -/** - * Base implementation of the [EventStream] implementation. - */ -internal abstract class AbstractEventStream : EventStream { - /** - * The state of the stream. - */ - protected var state = StreamState.Pending - - /** - * The event actions to dispatch to. - */ - private val eventActions = mutableListOf<EventDispatcher>() - - /** - * The error actions to use. - */ - private val errorActions = mutableListOf<(Throwable) -> Unit>() - - /** - * The close actions to use. - */ - private val closeActions = mutableListOf<Runnable>() - - /** - * The continuation that is invoked when the stream closes. - */ - private var cont: Continuation<Unit>? = null - - /** - * Dispatch the specified [event] to this stream. - */ - fun dispatch(event: Event) { - val actions = eventActions - - // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type) - for (action in actions) { - if (!action.accepts(event)) { - continue - } - - try { - action(event) - } catch (e: Exception) { - handleError(e) - } - } - } - - /** - * Handle the specified [throwable] that occurred while dispatching an event. - */ - private fun handleError(throwable: Throwable) { - val actions = errorActions - - // Default exception handler - if (actions.isEmpty()) { - throwable.printStackTrace() - return - } - - for (action in actions) { - action(throwable) - } - } - - override fun onEvent(action: (Event) -> Unit) { - eventActions += EventDispatcher(null, action) - } - - override fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit) { - @Suppress("UNCHECKED_CAST") // This cast must succeed - eventActions += EventDispatcher(type, action as (Event) -> Unit) - } - - override fun onError(action: (Throwable) -> Unit) { - errorActions += action - } - - override fun onClose(action: Runnable) { - closeActions += action - } - - override fun remove(action: Any): Boolean { - return eventActions.removeIf { it.action == action } || errorActions.remove(action) || closeActions.remove(action) - } - - override suspend fun start() { - check(state == StreamState.Pending) { "Stream has already started/closed" } - - state = StreamState.Started - - return suspendCancellableCoroutine { cont -> this.cont = cont } - } - - override fun close() { - if (state == StreamState.Closed) { - return - } - - state = StreamState.Closed - cont?.resume(Unit) - - val actions = closeActions - for (action in actions) { - action.run() - } - } -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt deleted file mode 100644 index 8b6de75e..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core.internal - -import org.opendc.trace.core.Event - -/** - * The [Dispatcher] is responsible for dispatching events onto configured actions. - */ -internal class Dispatcher { - /** - * The event actions to dispatch to. - */ - private val eventActions = mutableListOf<EventDispatcher>() - - /** - * The error actions to use. - */ - private val errorActions = mutableListOf<(Throwable) -> Unit>() - - /** - * Dispatch the specified [event]. - */ - fun dispatch(event: Event) { - val actions = eventActions - - // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type) - for (action in actions) { - if (!action.accepts(event)) { - continue - } - - try { - action(event) - } catch (e: Exception) { - handleError(e) - } - } - } - - /** - * Handle the specified [throwable] that occurred while dispatching an event. - */ - private fun handleError(throwable: Throwable) { - val actions = errorActions - - // Default exception handler - if (actions.isEmpty()) { - throwable.printStackTrace() - return - } - - for (action in actions) { - action(throwable) - } - } -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt deleted file mode 100644 index b2a662eb..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core.internal - -import org.opendc.trace.core.Event - -/** - * A dispatcher responsible for conditionally dispatching an event. - */ -internal class EventDispatcher(val type: Class<out Event>?, val action: (Event) -> Unit) { - /** - * Determine whether this dispatcher accepts the specified event. - */ - fun accepts(event: Event): Boolean { - return type == null || type.isAssignableFrom(event.javaClass) - } - - /** - * Invoke the specified [event] on this action. - */ - operator fun invoke(event: Event) { - action(event) - } -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt deleted file mode 100644 index e85d0779..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core.internal - -import org.opendc.trace.core.Event -import org.opendc.trace.core.EventTracer -import org.opendc.trace.core.RecordingStream -import java.lang.reflect.Modifier -import java.time.Clock -import java.util.* - -/** - * Default implementation of the [EventTracer] interface. - */ -internal class EventTracerImpl(override val clock: Clock) : EventTracer { - /** - * The set of enabled events. - */ - private val enabledEvents = IdentityHashMap<Class<out Event>, MutableList<Stream>>() - - /** - * The event streams created by the tracer. - */ - private val streams = WeakHashMap<Stream, Unit>() - - /** - * A flag to indicate that the stream is closed. - */ - private var isClosed: Boolean = false - - override fun isEnabled(type: Class<out Event>): Boolean = enabledEvents.containsKey(type) - - override fun commit(event: Event) { - val type = event.javaClass - - // Assign timestamp if not set - if (event.timestamp == Long.MIN_VALUE) { - event.timestamp = clock.millis() - } - - if (!isEnabled(type) || isClosed) { - return - } - - val streams = enabledEvents[type] ?: return - for (stream in streams) { - stream.dispatch(event) - } - } - - override fun openRecording(): RecordingStream = Stream() - - override fun close() { - isClosed = true - - val streams = streams - for ((stream, _) in streams) { - stream.close() - } - - enabledEvents.clear() - } - - /** - * Enable the specified [type] for the given [stream]. - */ - private fun enableFor(type: Class<out Event>, stream: Stream) { - val res = enabledEvents.computeIfAbsent(type) { mutableListOf() } - res.add(stream) - } - - /** - * Disable the specified [type] for the given [stream]. - */ - private fun disableFor(type: Class<out Event>, stream: Stream) { - enabledEvents[type]?.remove(stream) - } - - /** - * The [RecordingStream] associated with this [EventTracer] implementation. - */ - private inner class Stream : AbstractEventStream(), RecordingStream { - /** - * The set of enabled events for this stream. - */ - private val enabledEvents = IdentityHashMap<Class<out Event>, Unit>() - - init { - streams[this] = Unit - } - - override fun enable(type: Class<out Event>) { - validateEventClass(type) - - if (enabledEvents.put(type, Unit) == null && state == StreamState.Started) { - enableFor(type, this) - } - } - - override fun disable(type: Class<out Event>) { - validateEventClass(type) - - if (enabledEvents.remove(type) != null && state == StreamState.Started) { - disableFor(type, this) - } - } - - override suspend fun start() { - val enabledEvents = enabledEvents - for ((event, _) in enabledEvents) { - enableFor(event, this) - } - - super.start() - } - - override fun close() { - val enabledEvents = enabledEvents - for ((event, _) in enabledEvents) { - disableFor(event, this) - } - - // Remove this stream from the active streams - streams.remove(this) - - super.close() - } - - /** - * Validate the specified event subclass. - */ - private fun validateEventClass(type: Class<out Event>) { - require(!Modifier.isAbstract(type.modifiers)) { "Abstract event classes are not allowed" } - require(Event::class.java.isAssignableFrom(type)) { "Must be subclass to ${Event::class.qualifiedName}" } - } - } -} diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt deleted file mode 100644 index 9f411e0d..00000000 --- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.core.internal - -/** - * The state of a [Stream]. - */ -internal enum class StreamState { - Pending, Started, Closed -} diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt deleted file mode 100644 index 10f29f4e..00000000 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.utils.flow - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.flow.consumeAsFlow - -/** - * A [Flow] that can be used to emit events. - */ -public interface EventFlow<T> : Flow<T> { - /** - * Emit the specified [event]. - */ - public fun emit(event: T) - - /** - * Close the flow. - */ - public fun close() -} - -/** - * Creates a new [EventFlow]. - */ -@Suppress("FunctionName") -public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl() - -/** - * Internal implementation of the [EventFlow] class. - */ -@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) -private class EventFlowImpl<T> : EventFlow<T> { - private var closed: Boolean = false - private val subscribers = mutableListOf<SendChannel<T>>() - - override fun emit(event: T) { - if (closed) { - return - } - - val it = subscribers.iterator() - synchronized(this) { - while (it.hasNext()) { - val chan = it.next() - if (chan.isClosedForSend) { - it.remove() - } else { - chan.offer(event) - } - } - } - } - - override fun close() { - synchronized(this) { - closed = true - - for (chan in subscribers) { - chan.close() - } - - subscribers.clear() - } - } - - @InternalCoroutinesApi - override suspend fun collect(collector: FlowCollector<T>) { - val channel: Channel<T> - synchronized(this) { - if (closed) { - return - } - - channel = Channel(Channel.UNLIMITED) - subscribers.add(channel) - } - try { - channel.consumeAsFlow().collect(collector) - } finally { - channel.close() - } - } - - override fun toString(): String = "EventFlow" -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts index bec18ae9..040a9ff6 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -33,13 +33,14 @@ dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-workflow:opendc-workflow-api")) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-trace:opendc-trace-core")) + api(project(":opendc-telemetry:opendc-telemetry-api")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) testImplementation(project(":opendc-compute:opendc-compute-simulator")) testImplementation(project(":opendc-format")) + testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk")) testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl") } diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt deleted file mode 100644 index bb2ad6c6..00000000 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import org.opendc.trace.core.Event -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task - -/** - * An event emitted by the [WorkflowService]. - */ -public sealed class WorkflowEvent : Event() { - /** - * The [WorkflowService] that emitted the event. - */ - public abstract val service: WorkflowService - - /** - * This event is emitted when a job was submitted to the scheduler. - */ - public data class JobSubmitted( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a job has become active. - */ - public data class JobStarted( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a job has finished processing. - */ - public data class JobFinished( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a task of a job has started processing. - */ - public data class TaskStarted( - override val service: WorkflowService, - public val job: Job, - public val task: Task - ) : WorkflowEvent() - - /** - * This event is emitted when a task of a job has started processing. - */ - public data class TaskFinished( - override val service: WorkflowService, - public val job: Job, - public val task: Task - ) : WorkflowEvent() -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index 2f83e376..d3358ef1 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,9 +22,8 @@ package org.opendc.workflow.service -import kotlinx.coroutines.flow.Flow +import io.opentelemetry.api.metrics.Meter import org.opendc.compute.api.ComputeClient -import org.opendc.trace.core.EventTracer import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode @@ -42,14 +41,14 @@ import kotlin.coroutines.CoroutineContext */ public interface WorkflowService : AutoCloseable { /** - * The events emitted by the workflow scheduler. + * Submit the specified [Job] to the workflow service for scheduling. */ - public val events: Flow<WorkflowEvent> + public suspend fun submit(job: Job) /** - * Submit the specified [Job] to the workflow service for scheduling. + * Run the specified [Job] and suspend execution until the job is finished. */ - public suspend fun submit(job: Job) + public suspend fun run(job: Job) /** * Terminate the lifecycle of the workflow service, stopping all running workflows. @@ -63,6 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. + * @param meter The meter to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - tracer: EventTracer, + meter: Meter, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - tracer, + meter, compute, mode, jobAdmissionPolicy, diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 85a88acd..32191b8f 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -22,17 +22,11 @@ package org.opendc.workflow.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.flow.Flow +import io.opentelemetry.api.metrics.Meter +import kotlinx.coroutines.* import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* -import org.opendc.trace.core.EventTracer -import org.opendc.trace.core.consumeAsFlow -import org.opendc.trace.core.enable import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* @@ -43,7 +37,9 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock import java.util.* +import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for @@ -52,7 +48,7 @@ import kotlin.coroutines.CoroutineContext public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, - internal val tracer: EventTracer, + private val meter: Meter, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -63,7 +59,7 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - internal val scope = CoroutineScope(context) + internal val scope = CoroutineScope(context + Job()) /** * The logger instance to use. @@ -106,6 +102,11 @@ public class WorkflowServiceImpl( internal val taskByServer = mutableMapOf<Server, TaskState>() /** + * The continuation of the jobs. + */ + private val conts = mutableMapOf<Job, Continuation<Unit>>() + + /** * The root listener of this scheduler. */ private val rootListener = object : WorkflowSchedulerListener { @@ -151,6 +152,54 @@ public class WorkflowServiceImpl( } } + /** + * The number of jobs that have been submitted to the service. + */ + private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + .setDescription("Number of submitted jobs") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + .setDescription("Number of jobs running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedJobs = meter.longCounterBuilder("jobs.finished") + .setDescription("Number of jobs that finished running") + .setUnit("1") + .build() + + /** + * The number of tasks that have been submitted to the service. + */ + private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + .setDescription("Number of submitted tasks") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + .setDescription("Number of tasks running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedTasks = meter.longCounterBuilder("tasks.finished") + .setDescription("Number of tasks that finished running") + .setUnit("1") + .build() + private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic @@ -167,16 +216,7 @@ public class WorkflowServiceImpl( } } - override val events: Flow<WorkflowEvent> = tracer.openRecording().let { - it.enable<WorkflowEvent.JobSubmitted>() - it.enable<WorkflowEvent.JobStarted>() - it.enable<WorkflowEvent.JobFinished>() - it.enable<WorkflowEvent.TaskStarted>() - it.enable<WorkflowEvent.TaskFinished>() - it.consumeAsFlow().map { event -> event as WorkflowEvent } - } - - override suspend fun submit(job: Job) { + override suspend fun run(job: Job) { // J1 Incoming Jobs val jobInstance = JobState(job, clock.millis()) val instances = job.tasks.associateWith { @@ -193,14 +233,24 @@ public class WorkflowServiceImpl( if (instance.isRoot) { instance.state = TaskStatus.READY } + + submittedTasks.add(1) } - instances.values.toCollection(jobInstance.tasks) - incomingJobs += jobInstance - rootListener.jobSubmitted(jobInstance) - tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) + return suspendCancellableCoroutine { cont -> + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + conts[job] = cont + + submittedJobs.add(1) + + requestCycle() + } + } - requestCycle() + override suspend fun submit(job: Job) { + scope.launch { run(job) } } override fun close() { @@ -231,12 +281,8 @@ public class WorkflowServiceImpl( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - tracer.commit( - WorkflowEvent.JobStarted( - this, - jobInstance.job - ) - ) + + runningJobs.add(1) rootListener.jobStarted(jobInstance) } @@ -311,18 +357,11 @@ public class WorkflowServiceImpl( public override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { - ServerState.PROVISIONING -> { - } + ServerState.PROVISIONING -> {} ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - tracer.commit( - WorkflowEvent.TaskStarted( - this@WorkflowServiceImpl, - task.job.job, - task.task - ) - ) + runningTasks.add(1) rootListener.taskStarted(task) } ServerState.TERMINATED, ServerState.ERROR -> { @@ -338,13 +377,9 @@ public class WorkflowServiceImpl( task.finishedAt = clock.millis() job.tasks.remove(task) activeTasks -= task - tracer.commit( - WorkflowEvent.TaskFinished( - this@WorkflowServiceImpl, - task.job.job, - task.task - ) - ) + + runningTasks.add(-1) + finishedTasks.add(1) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -371,8 +406,11 @@ public class WorkflowServiceImpl( private fun finishJob(job: JobState) { activeJobs -= job - tracer.commit(WorkflowEvent.JobFinished(this, job.job)) + runningJobs.add(-1) + finishedJobs.add(1) rootListener.jobFinished(job) + + conts.remove(job.job)?.resume(Unit) } public fun addListener(listener: WorkflowSchedulerListener) { diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt deleted file mode 100644 index 2161f5f2..00000000 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNotEquals -import org.junit.jupiter.api.DisplayName -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.simulator.SimHost -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider -import org.opendc.simulator.utils.DelayControllerClockAdapter -import org.opendc.trace.core.EventTracer -import org.opendc.workflow.service.internal.WorkflowServiceImpl -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode -import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy -import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy -import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import kotlin.math.max - -/** - * Integration test suite for the [WorkflowServiceImpl]. - */ -@DisplayName("WorkflowServiceImpl") -@OptIn(ExperimentalCoroutinesApi::class) -internal class StageWorkflowSchedulerIntegrationTest { - /** - * A large integration test where we check whether all tasks in some trace are executed correctly. - */ - @Test - fun testTrace() { - var jobsSubmitted = 0L - var jobsStarted = 0L - var jobsFinished = 0L - var tasksStarted = 0L - var tasksFinished = 0L - - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - val tracer = EventTracer(clock) - - val scheduler = let { - val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - testScope.coroutineContext, - clock, - SimSpaceSharedHypervisorProvider() - ) - } - - val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) - - hosts.forEach { compute.addHost(it) } - - WorkflowService( - testScope.coroutineContext, - clock, - tracer, - compute.newClient(), - mode = WorkflowSchedulerMode.Batch(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - } - - testScope.launch { - scheduler.events - .onEach { event -> - when (event) { - is WorkflowEvent.JobStarted -> jobsStarted++ - is WorkflowEvent.JobFinished -> jobsFinished++ - is WorkflowEvent.TaskStarted -> tasksStarted++ - is WorkflowEvent.TaskFinished -> tasksFinished++ - } - } - .collect() - } - - testScope.launch { - val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) - - while (reader.hasNext()) { - val entry = reader.next() - jobsSubmitted++ - delay(max(0, entry.start - clock.millis())) - scheduler.submit(entry.workload) - } - } - - testScope.advanceUntilIdle() - - assertAll( - { assertEquals(emptyList<Throwable>(), testScope.uncaughtExceptions) }, - { assertNotEquals(0, jobsSubmitted, "No jobs submitted") }, - { assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") }, - { assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") }, - { assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") } - ) - } -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt new file mode 100644 index 00000000..46c0d835 --- /dev/null +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runBlockingTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHost +import org.opendc.format.environment.sc18.Sc18EnvironmentReader +import org.opendc.format.trace.gwf.GwfTraceReader +import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import kotlin.math.max + +/** + * Integration test suite for the [WorkflowServiceImpl]. + */ +@DisplayName("WorkflowServiceImpl") +@OptIn(ExperimentalCoroutinesApi::class) +internal class WorkflowServiceIntegrationTest { + /** + * A large integration test where we check whether all tasks in some trace are executed correctly. + */ + @Test + fun testTrace() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) + .use { it.read() } + .map { def -> + SimHost( + def.uid, + def.name, + def.model, + def.meta, + coroutineContext, + clock, + MeterProvider.noop().get("opendc-compute-simulator"), + SimSpaceSharedHypervisorProvider() + ) + } + + val meter = MeterProvider.noop().get("opendc-compute") + val compute = ComputeService(coroutineContext, clock, meter, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) + + hosts.forEach { compute.addHost(it) } + + val scheduler = WorkflowService( + coroutineContext, + clock, + meterProvider.get("opendc-workflow"), + compute.newClient(), + mode = WorkflowSchedulerMode.Batch(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + ) + + val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) + var offset = Long.MIN_VALUE + + coroutineScope { + while (reader.hasNext()) { + val entry = reader.next() + + if (offset < 0) { + offset = entry.start - clock.millis() + } + + delay(max(0, (entry.start - offset) - clock.millis())) + launch { + scheduler.run(entry.workload) + } + } + } + + hosts.forEach(SimHost::close) + scheduler.close() + compute.close() + + val metrics = collectMetrics(meterProvider as MetricProducer) + + assertAll( + { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, + { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, + { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") } + ) + } + + class WorkflowMetrics { + var jobsSubmitted = 0L + var jobsActive = 0L + var jobsFinished = 0L + var tasksSubmitted = 0L + var tasksActive = 0L + var tasksFinished = 0L + } + + /** + * Collect the metrics of the workflow service. + */ + private fun collectMetrics(metricProducer: MetricProducer): WorkflowMetrics { + val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + val res = WorkflowMetrics() + res.jobsSubmitted = metrics["jobs.submitted"]?.longSumData?.points?.last()?.value ?: 0 + res.jobsActive = metrics["jobs.active"]?.longSumData?.points?.last()?.value ?: 0 + res.jobsFinished = metrics["jobs.finished"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksSubmitted = metrics["tasks.submitted"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksActive = metrics["tasks.active"]?.longSumData?.points?.last()?.value ?: 0 + res.tasksFinished = metrics["tasks.finished"]?.longSumData?.points?.last()?.value ?: 0 + return res + } +} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml index 70a0eacc..edcf41ed 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml @@ -28,6 +28,9 @@ </Console> </Appenders> <Loggers> + <Logger name="org.opendc.workflow" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> <Root level="warn"> <AppenderRef ref="Console"/> </Root> diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index d5603664..2e297997 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -31,13 +31,13 @@ include(":opendc-serverless:opendc-serverless-api") include(":opendc-serverless:opendc-serverless-service") include(":opendc-serverless:opendc-serverless-simulator") include(":opendc-format") -include(":opendc-experiments:opendc-experiments-sc18") include(":opendc-experiments:opendc-experiments-capelin") include(":opendc-runner-web") include(":opendc-simulator:opendc-simulator-core") include(":opendc-simulator:opendc-simulator-resources") include(":opendc-simulator:opendc-simulator-compute") include(":opendc-simulator:opendc-simulator-failures") -include(":opendc-trace:opendc-trace-core") +include(":opendc-telemetry:opendc-telemetry-api") +include(":opendc-telemetry:opendc-telemetry-sdk") include(":opendc-harness") include(":opendc-utils") |
