summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/buildSrc/src/main/kotlin/Versions.kt5
-rw-r--r--simulator/gradle.properties6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/build.gradle.kts5
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt13
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt47
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt72
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt31
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt31
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt30
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt30
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt32
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt30
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt424
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt4
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt46
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt7
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt390
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt80
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt81
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt285
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt219
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml38
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt121
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt142
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt179
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt69
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt171
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt15
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt26
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt136
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/.gitignore2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt137
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt86
-rw-r--r--simulator/opendc-runner-web/build.gradle.kts2
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt172
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt22
-rw-r--r--simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts1
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt7
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt1
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt14
-rw-r--r--simulator/opendc-telemetry/build.gradle.kts (renamed from simulator/opendc-trace/build.gradle.kts)2
-rw-r--r--simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts (renamed from simulator/opendc-trace/opendc-trace-core/build.gradle.kts)7
-rw-r--r--simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts (renamed from simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts)16
-rw-r--r--simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt (renamed from simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt)23
-rw-r--r--simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt99
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt76
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt84
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt73
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt52
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt139
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt77
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt44
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt157
-rw-r--r--simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt30
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt112
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts3
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt79
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt16
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt134
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt139
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt159
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml3
-rw-r--r--simulator/settings.gradle.kts4
72 files changed, 2435 insertions, 2337 deletions
diff --git a/simulator/buildSrc/src/main/kotlin/Versions.kt b/simulator/buildSrc/src/main/kotlin/Versions.kt
index d1df6284..6aa9260b 100644
--- a/simulator/buildSrc/src/main/kotlin/Versions.kt
+++ b/simulator/buildSrc/src/main/kotlin/Versions.kt
@@ -49,6 +49,11 @@ public class Versions(private val project: Project) {
val kotlinxCoroutines by version(name = "kotlinx-coroutines")
+ val otelApi by version(name = "opentelemetry-api")
+ val otelApiMetrics by version(name = "opentelemetry-api-metrics")
+ val otelSdk by version(name = "opentelemetry-sdk")
+ val otelSdkMetrics by version(name = "opentelemetry-sdk-metrics")
+
/**
* Obtain the version for the specified [dependency][name].
diff --git a/simulator/gradle.properties b/simulator/gradle.properties
index 8d41408c..99b08bb2 100644
--- a/simulator/gradle.properties
+++ b/simulator/gradle.properties
@@ -28,6 +28,12 @@ kotlin-logging.version = 2.0.6
slf4j.version = 1.7.30
log4j.version = 2.14.1
+# Dependencies (Telemetry)
+opentelemetry-api.version = 1.0.1
+opentelemetry-api-metrics.version = 1.0.1-alpha
+opentelemetry-sdk.version = 1.0.1
+opentelemetry-sdk-metrics.version = 1.0.1-alpha
+
# Dependencies (CLI)
clikt.version = 3.1.0
progressbar.version = 0.9.0
diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
index 1b09ef6d..909e2dcd 100644
--- a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
@@ -26,15 +26,16 @@ description = "OpenDC Compute Service implementation"
plugins {
`kotlin-library-conventions`
`testing-conventions`
+ `jacoco-conventions`
}
dependencies {
api(platform(project(":opendc-platform")))
api(project(":opendc-compute:opendc-compute-api"))
- api(project(":opendc-trace:opendc-trace-core"))
+ api(project(":opendc-telemetry:opendc-telemetry-api"))
implementation(project(":opendc-utils"))
implementation("io.github.microutils:kotlin-logging")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
- testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}")
+ testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl")
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 593e4b56..98566da3 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -22,12 +22,11 @@
package org.opendc.compute.service
-import kotlinx.coroutines.flow.Flow
+import io.opentelemetry.api.metrics.Meter
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AllocationPolicy
-import org.opendc.trace.core.EventTracer
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -36,11 +35,6 @@ import kotlin.coroutines.CoroutineContext
*/
public interface ComputeService : AutoCloseable {
/**
- * The events emitted by the service.
- */
- public val events: Flow<ComputeServiceEvent>
-
- /**
* The hosts that are used by the compute service.
*/
public val hosts: Set<Host>
@@ -76,17 +70,16 @@ public interface ComputeService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
- * @param tracer The event tracer to use.
* @param allocationPolicy The allocation policy to use.
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- tracer: EventTracer,
+ meter: Meter,
allocationPolicy: AllocationPolicy,
schedulingQuantum: Long = 300000,
): ComputeService {
- return ComputeServiceImpl(context, clock, tracer, allocationPolicy, schedulingQuantum)
+ return ComputeServiceImpl(context, clock, meter, allocationPolicy, schedulingQuantum)
}
}
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt
deleted file mode 100644
index 193008a7..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service
-
-/**
- * An event that is emitted by the [ComputeService].
- */
-public sealed class ComputeServiceEvent {
- /**
- * The service that has emitted the event.
- */
- public abstract val provisioner: ComputeService
-
- /**
- * An event emitted for writing metrics.
- */
- public data class MetricsAvailable(
- override val provisioner: ComputeService,
- public val totalHostCount: Int,
- public val availableHostCount: Int,
- public val totalVmCount: Int,
- public val activeVmCount: Int,
- public val inactiveVmCount: Int,
- public val waitingVmCount: Int,
- public val failedVmCount: Int
- ) : ComputeServiceEvent()
-}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
index c3c39572..bed15dfd 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
@@ -22,7 +22,6 @@
package org.opendc.compute.service.driver
-import kotlinx.coroutines.flow.Flow
import org.opendc.compute.api.Server
import java.util.*
@@ -56,11 +55,6 @@ public interface Host {
public val meta: Map<String, Any>
/**
- * The events emitted by the driver.
- */
- public val events: Flow<HostEvent>
-
- /**
* Determine whether the specified [instance][server] can still fit on this host.
*/
public fun canFit(server: Server): Boolean
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt
deleted file mode 100644
index 97350679..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.driver
-
-/**
- * An event that is emitted by a [Host].
- */
-public sealed class HostEvent {
- /**
- * The driver that emitted the event.
- */
- public abstract val driver: Host
-
- /**
- * This event is emitted when the number of active servers on the server managed by this driver is updated.
- *
- * @property driver The driver that emitted the event.
- * @property numberOfActiveServers The number of active servers.
- * @property availableMemory The available memory, in MB.
- */
- public data class VmsUpdated(
- override val driver: Host,
- public val numberOfActiveServers: Int,
- public val availableMemory: Long
- ) : HostEvent()
-
- /**
- * This event is emitted when a slice is finished.
- *
- * @property driver The driver that emitted the event.
- * @property requestedBurst The total requested CPU time (can be above capacity).
- * @property grantedBurst The actual total granted capacity, which might be lower than the requested burst due to
- * the hypervisor being interrupted during a slice.
- * @property overcommissionedBurst The CPU time that the hypervisor could not grant to the virtual machine since
- * it did not have the capacity.
- * @property interferedBurst The sum of CPU time that virtual machines could not utilize due to performance
- * interference.
- * @property cpuUsage CPU use in megahertz.
- * @property cpuDemand CPU demand in megahertz.
- * @property numberOfDeployedImages The number of images deployed on this hypervisor.
- */
- public data class SliceFinished(
- override val driver: Host,
- public val requestedBurst: Long,
- public val grantedBurst: Long,
- public val overcommissionedBurst: Long,
- public val interferedBurst: Long,
- public val cpuUsage: Double,
- public val cpuDemand: Double,
- public val numberOfDeployedImages: Int,
- ) : HostEvent()
-}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt
deleted file mode 100644
index a7974062..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.events
-
-import org.opendc.trace.core.Event
-import java.util.*
-
-/**
- * This event is emitted when a hypervisor has become available.
- */
-public class HypervisorAvailableEvent(public val uid: UUID) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt
deleted file mode 100644
index 75bb09ed..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.events
-
-import org.opendc.trace.core.Event
-import java.util.*
-
-/**
- * This event is emitted when a hypervisor has become unavailable.
- */
-public class HypervisorUnavailableEvent(public val uid: UUID) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt
deleted file mode 100644
index f59c74b7..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.events
-
-import org.opendc.trace.core.Event
-
-/**
- * This event is emitted when a virtual machine has successfully been scheduled on a hypervisor.
- */
-public class VmScheduledEvent(public val name: String) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt
deleted file mode 100644
index eaf0736b..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.events
-
-import org.opendc.trace.core.Event
-
-/**
- * This event is emitted when a virtual machine has stopped running.
- */
-public class VmStoppedEvent(public val name: String) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt
deleted file mode 100644
index fa0a8a13..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.events
-
-import org.opendc.compute.api.Flavor
-import org.opendc.compute.api.Image
-import org.opendc.trace.core.Event
-
-/**
- * This event is emitted when a virtual machine is submitted to the provisioning service.
- */
-public class VmSubmissionEvent(public val name: String, public val image: Image, public val flavor: Flavor) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt
deleted file mode 100644
index 52b91616..00000000
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.service.events
-
-import org.opendc.trace.core.Event
-
-/**
- * An event that is emitted when the submission is deemed to be invalid.
- */
-public class VmSubmissionInvalidEvent(public val name: String) : Event()
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt
index 29f10e27..4a8d3046 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt
@@ -59,4 +59,10 @@ internal class ClientFlavor(private val delegate: Flavor) : Flavor {
labels = delegate.labels
meta = delegate.meta
}
+
+ override fun equals(other: Any?): Boolean = other is Flavor && other.uid == uid
+
+ override fun hashCode(): Int = uid.hashCode()
+
+ override fun toString(): String = "Flavor[uid=$uid,name=$name]"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt
index 6c5b2ab0..e0b5c171 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt
@@ -52,4 +52,10 @@ internal class ClientImage(private val delegate: Image) : Image {
labels = delegate.labels
meta = delegate.meta
}
+
+ override fun equals(other: Any?): Boolean = other is Image && other.uid == uid
+
+ override fun hashCode(): Int = uid.hashCode()
+
+ override fun toString(): String = "Image[uid=$uid,name=$name]"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
index ae4cee3b..f2929bf3 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
@@ -104,4 +104,10 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche
watcher.onStateChanged(this, newState)
}
}
+
+ override fun equals(other: Any?): Boolean = other is Server && other.uid == uid
+
+ override fun hashCode(): Int = uid.hashCode()
+
+ override fun toString(): String = "Server[uid=$uid,name=$name,state=$state]"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index aa7e0aa1..26a34ad9 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,20 +22,16 @@
package org.opendc.compute.service.internal
+import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.Flow
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.events.*
import org.opendc.compute.service.scheduler.AllocationPolicy
-import org.opendc.trace.core.EventTracer
import org.opendc.utils.TimerScheduler
-import org.opendc.utils.flow.EventFlow
import java.time.Clock
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -47,17 +43,17 @@ import kotlin.math.max
* @param context The [CoroutineContext] to use.
* @param clock The clock instance to keep track of time.
*/
-public class ComputeServiceImpl(
+internal class ComputeServiceImpl(
private val context: CoroutineContext,
private val clock: Clock,
- private val tracer: EventTracer,
+ private val meter: Meter,
private val allocationPolicy: AllocationPolicy,
private val schedulingQuantum: Long
) : ComputeService, HostListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- private val scope = CoroutineScope(context)
+ private val scope = CoroutineScope(context + Job())
/**
* The logger instance of this server.
@@ -104,24 +100,70 @@ public class ComputeServiceImpl(
*/
private val servers = mutableMapOf<UUID, InternalServer>()
- public var submittedVms: Int = 0
- public var queuedVms: Int = 0
- public var runningVms: Int = 0
- public var finishedVms: Int = 0
- public var unscheduledVms: Int = 0
-
private var maxCores = 0
private var maxMemory = 0L
/**
+ * The number of servers that have been submitted to the service for provisioning.
+ */
+ private val _submittedServers = meter.longCounterBuilder("servers.submitted")
+ .setDescription("Number of start requests")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that failed to be scheduled.
+ */
+ private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled")
+ .setDescription("Number of unscheduled servers")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that are waiting to be provisioned.
+ */
+ private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting")
+ .setDescription("Number of servers waiting to be provisioned")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that are waiting to be provisioned.
+ */
+ private val _runningServers = meter.longUpDownCounterBuilder("servers.active")
+ .setDescription("Number of servers currently running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of servers that have finished running.
+ */
+ private val _finishedServers = meter.longCounterBuilder("servers.finished")
+ .setDescription("Number of servers that finished running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of hosts registered at the compute service.
+ */
+ private val _hostCount = meter.longUpDownCounterBuilder("hosts.total")
+ .setDescription("Number of hosts")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of available hosts registered at the compute service.
+ */
+ private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available")
+ .setDescription("Number of available hosts")
+ .setUnit("1")
+ .build()
+
+ /**
* The allocation logic to use.
*/
private val allocationLogic = allocationPolicy()
- override val events: Flow<ComputeServiceEvent>
- get() = _events
- private val _events = EventFlow<ComputeServiceEvent>()
-
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
*/
@@ -133,130 +175,119 @@ public class ComputeServiceImpl(
override val hostCount: Int
get() = hostToView.size
- override fun newClient(): ComputeClient = object : ComputeClient {
- private var isClosed: Boolean = false
+ override fun newClient(): ComputeClient {
+ check(scope.isActive) { "Service is already closed" }
+ return object : ComputeClient {
+ private var isClosed: Boolean = false
- override suspend fun queryFlavors(): List<Flavor> {
- check(!isClosed) { "Client is already closed" }
+ override suspend fun queryFlavors(): List<Flavor> {
+ check(!isClosed) { "Client is already closed" }
- return flavors.values.map { ClientFlavor(it) }
- }
+ return flavors.values.map { ClientFlavor(it) }
+ }
- override suspend fun findFlavor(id: UUID): Flavor? {
- check(!isClosed) { "Client is already closed" }
+ override suspend fun findFlavor(id: UUID): Flavor? {
+ check(!isClosed) { "Client is already closed" }
- return flavors[id]?.let { ClientFlavor(it) }
- }
+ return flavors[id]?.let { ClientFlavor(it) }
+ }
- override suspend fun newFlavor(
- name: String,
- cpuCount: Int,
- memorySize: Long,
- labels: Map<String, String>,
- meta: Map<String, Any>
- ): Flavor {
- check(!isClosed) { "Client is already closed" }
-
- val uid = UUID(clock.millis(), random.nextLong())
- val flavor = InternalFlavor(
- this@ComputeServiceImpl,
- uid,
- name,
- cpuCount,
- memorySize,
- labels,
- meta
- )
-
- flavors[uid] = flavor
-
- return ClientFlavor(flavor)
- }
+ override suspend fun newFlavor(
+ name: String,
+ cpuCount: Int,
+ memorySize: Long,
+ labels: Map<String, String>,
+ meta: Map<String, Any>
+ ): Flavor {
+ check(!isClosed) { "Client is already closed" }
+
+ val uid = UUID(clock.millis(), random.nextLong())
+ val flavor = InternalFlavor(
+ this@ComputeServiceImpl,
+ uid,
+ name,
+ cpuCount,
+ memorySize,
+ labels,
+ meta
+ )
- override suspend fun queryImages(): List<Image> {
- check(!isClosed) { "Client is already closed" }
+ flavors[uid] = flavor
- return images.values.map { ClientImage(it) }
- }
+ return ClientFlavor(flavor)
+ }
- override suspend fun findImage(id: UUID): Image? {
- check(!isClosed) { "Client is already closed" }
+ override suspend fun queryImages(): List<Image> {
+ check(!isClosed) { "Client is already closed" }
- return images[id]?.let { ClientImage(it) }
- }
+ return images.values.map { ClientImage(it) }
+ }
- override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image {
- check(!isClosed) { "Client is already closed" }
+ override suspend fun findImage(id: UUID): Image? {
+ check(!isClosed) { "Client is already closed" }
- val uid = UUID(clock.millis(), random.nextLong())
- val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta)
+ return images[id]?.let { ClientImage(it) }
+ }
- images[uid] = image
+ override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image {
+ check(!isClosed) { "Client is already closed" }
- return ClientImage(image)
- }
+ val uid = UUID(clock.millis(), random.nextLong())
+ val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta)
- override suspend fun newServer(
- name: String,
- image: Image,
- flavor: Flavor,
- labels: Map<String, String>,
- meta: Map<String, Any>,
- start: Boolean
- ): Server {
- check(!isClosed) { "Client is closed" }
- tracer.commit(VmSubmissionEvent(name, image, flavor))
-
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
+ images[uid] = image
+
+ return ClientImage(image)
+ }
+
+ override suspend fun newServer(
+ name: String,
+ image: Image,
+ flavor: Flavor,
+ labels: Map<String, String>,
+ meta: Map<String, Any>,
+ start: Boolean
+ ): Server {
+ check(!isClosed) { "Client is closed" }
+
+ val uid = UUID(clock.millis(), random.nextLong())
+ val server = InternalServer(
this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- ++submittedVms,
- runningVms,
- finishedVms,
- ++queuedVms,
- unscheduledVms
+ uid,
+ name,
+ requireNotNull(flavors[flavor.uid]) { "Unknown flavor" },
+ requireNotNull(images[image.uid]) { "Unknown image" },
+ labels.toMutableMap(),
+ meta.toMutableMap()
)
- )
-
- val uid = UUID(clock.millis(), random.nextLong())
- val server = InternalServer(
- this@ComputeServiceImpl,
- uid,
- name,
- flavor,
- image,
- labels.toMutableMap(),
- meta.toMutableMap()
- )
-
- servers[uid] = server
-
- if (start) {
- server.start()
+
+ servers[uid] = server
+
+ if (start) {
+ server.start()
+ }
+
+ return ClientServer(server)
}
- return ClientServer(server)
- }
+ override suspend fun findServer(id: UUID): Server? {
+ check(!isClosed) { "Client is already closed" }
- override suspend fun findServer(id: UUID): Server? {
- check(!isClosed) { "Client is already closed" }
+ return servers[id]?.let { ClientServer(it) }
+ }
- return servers[id]?.let { ClientServer(it) }
- }
+ override suspend fun queryServers(): List<Server> {
+ check(!isClosed) { "Client is already closed" }
- override suspend fun queryServers(): List<Server> {
- check(!isClosed) { "Client is already closed" }
+ return servers.values.map { ClientServer(it) }
+ }
- return servers.values.map { ClientServer(it) }
- }
+ override fun close() {
+ isClosed = true
+ }
- override fun close() {
- isClosed = true
+ override fun toString(): String = "ComputeClient"
}
-
- override fun toString(): String = "ComputeClient"
}
override fun addHost(host: Host) {
@@ -271,37 +302,50 @@ public class ComputeServiceImpl(
hostToView[host] = hv
if (host.state == HostState.UP) {
+ _availableHostCount.add(1)
availableHosts += hv
}
+ _hostCount.add(1)
host.addListener(this)
}
override fun removeHost(host: Host) {
- host.removeListener(this)
+ val view = hostToView.remove(host)
+ if (view != null) {
+ if (availableHosts.remove(view)) {
+ _availableHostCount.add(-1)
+ }
+ host.removeListener(this)
+ _hostCount.add(-1)
+ }
}
override fun close() {
scope.cancel()
}
- internal fun schedule(server: InternalServer) {
+ internal fun schedule(server: InternalServer): SchedulingRequest {
logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
- queue.add(SchedulingRequest(server))
+ val request = SchedulingRequest(server)
+ queue.add(request)
+ _submittedServers.add(1)
+ _waitingServers.add(1)
requestSchedulingCycle()
+ return request
}
internal fun delete(flavor: InternalFlavor) {
- checkNotNull(flavors.remove(flavor.uid)) { "Flavor was not known" }
+ flavors.remove(flavor.uid)
}
internal fun delete(image: InternalImage) {
- checkNotNull(images.remove(image.uid)) { "Image was not known" }
+ images.remove(image.uid)
}
internal fun delete(server: InternalServer) {
- checkNotNull(servers.remove(server.uid)) { "Server was not known" }
+ servers.remove(server.uid)
}
/**
@@ -332,34 +376,24 @@ public class ComputeServiceImpl(
if (request.isCancelled) {
queue.poll()
+ _waitingServers.add(-1)
continue
}
val server = request.server
val hv = allocationLogic.select(availableHosts, request.server)
if (hv == null || !hv.host.canFit(server)) {
- logger.trace { "Server $server selected for scheduling but no capacity available for it." }
+ logger.trace { "Server $server selected for scheduling but no capacity available for it at the moment" }
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
- tracer.commit(VmSubmissionInvalidEvent(server.name))
-
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- runningVms,
- finishedVms,
- --queuedVms,
- ++unscheduledVms
- )
- )
-
// Remove the incoming image
queue.poll()
+ _waitingServers.add(-1)
+ _unscheduledServers.add(1)
logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
+
+ server.state = ServerState.ERROR
continue
} else {
break
@@ -370,44 +404,28 @@ public class ComputeServiceImpl(
// Remove request from queue
queue.poll()
+ _waitingServers.add(-1)
logger.info { "Assigned server $server to host $host." }
- try {
- // Speculatively update the hypervisor view information to prevent other images in the queue from
- // deciding on stale values.
- hv.numberOfActiveServers++
- hv.provisionedCores += server.flavor.cpuCount
- hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack
-
- scope.launch {
- try {
- server.assignHost(host)
- host.spawn(server)
- activeServers[server] = host
-
- tracer.commit(VmScheduledEvent(server.name))
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
- )
- )
- } catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
-
- hv.numberOfActiveServers--
- hv.provisionedCores -= server.flavor.cpuCount
- hv.availableMemory += server.flavor.memorySize
- }
+
+ // Speculatively update the hypervisor view information to prevent other images in the queue from
+ // deciding on stale values.
+ hv.numberOfActiveServers++
+ hv.provisionedCores += server.flavor.cpuCount
+ hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack
+
+ scope.launch {
+ try {
+ server.host = host
+ host.spawn(server)
+ activeServers[server] = host
+ } catch (e: Throwable) {
+ logger.error("Failed to deploy VM", e)
+
+ hv.numberOfActiveServers--
+ hv.provisionedCores -= server.flavor.cpuCount
+ hv.availableMemory += server.flavor.memorySize
}
- } catch (e: Exception) {
- logger.warn(e) { "Failed to assign server $server to $host. " }
}
}
}
@@ -415,7 +433,7 @@ public class ComputeServiceImpl(
/**
* A request to schedule an [InternalServer] onto one of the [Host]s.
*/
- private data class SchedulingRequest(val server: InternalServer) {
+ internal data class SchedulingRequest(val server: InternalServer) {
/**
* A flag to indicate that the request is cancelled.
*/
@@ -431,23 +449,9 @@ public class ComputeServiceImpl(
if (hv != null) {
// Corner case for when the hypervisor already exists
availableHosts += hv
+ _availableHostCount.add(1)
}
- tracer.commit(HypervisorAvailableEvent(host.uid))
-
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
// Re-schedule on the new machine
requestSchedulingCycle()
}
@@ -456,21 +460,7 @@ public class ComputeServiceImpl(
val hv = hostToView[host] ?: return
availableHosts -= hv
-
- tracer.commit(HypervisorUnavailableEvent(hv.uid))
-
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
+ _availableHostCount.add(-1)
requestSchedulingCycle()
}
@@ -488,25 +478,15 @@ public class ComputeServiceImpl(
server.state = newState
- if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
+ if (newState == ServerState.RUNNING) {
+ _runningServers.add(1)
+ } else if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
- tracer.commit(VmStoppedEvent(server.name))
-
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- --runningVms,
- ++finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
activeServers -= server
+ _runningServers.add(-1)
+ _finishedServers.add(1)
+
val hv = hostToView[host]
if (hv != null) {
hv.provisionedCores -= server.flavor.cpuCount
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
index 1bdfdf1a..5793541f 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
@@ -32,4 +32,6 @@ public class HostView(public val host: Host) {
public var numberOfActiveServers: Int = 0
public var availableMemory: Long = host.model.memorySize
public var provisionedCores: Int = 0
+
+ override fun toString(): String = "HostView[host=$host]"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt
index 95e280df..b8fb6279 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt
@@ -58,7 +58,9 @@ internal class InternalFlavor(
service.delete(this)
}
- override fun equals(other: Any?): Boolean = other is InternalFlavor && uid == other.uid
+ override fun equals(other: Any?): Boolean = other is Flavor && uid == other.uid
override fun hashCode(): Int = uid.hashCode()
+
+ override fun toString(): String = "Flavor[uid=$uid,name=$name]"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt
index 86f2f6b9..d9ed5896 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt
@@ -48,7 +48,9 @@ internal class InternalImage(
service.delete(this)
}
- override fun equals(other: Any?): Boolean = other is InternalImage && uid == other.uid
+ override fun equals(other: Any?): Boolean = other is Image && uid == other.uid
override fun hashCode(): Int = uid.hashCode()
+
+ override fun toString(): String = "Image[uid=$uid,name=$name]"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index ff7c1d15..d9d0f3fc 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -34,8 +34,8 @@ internal class InternalServer(
private val service: ComputeServiceImpl,
override val uid: UUID,
override val name: String,
- override val flavor: Flavor,
- override val image: Image,
+ override val flavor: InternalFlavor,
+ override val image: InternalImage,
override val labels: MutableMap<String, String>,
override val meta: MutableMap<String, Any>
) : Server {
@@ -54,6 +54,11 @@ internal class InternalServer(
*/
internal var host: Host? = null
+ /**
+ * The current scheduling request.
+ */
+ private var request: ComputeServiceImpl.SchedulingRequest? = null
+
override suspend fun start() {
when (state) {
ServerState.RUNNING -> {
@@ -66,35 +71,43 @@ internal class InternalServer(
}
ServerState.DELETED -> {
logger.warn { "User tried to start terminated server" }
- throw IllegalArgumentException("Server is terminated")
+ throw IllegalStateException("Server is terminated")
}
else -> {
logger.info { "User requested to start server $uid" }
state = ServerState.PROVISIONING
- service.schedule(this)
+ assert(request == null) { "Scheduling request already active" }
+ request = service.schedule(this)
}
}
}
override suspend fun stop() {
when (state) {
- ServerState.PROVISIONING -> {} // TODO Find way to interrupt these
+ ServerState.PROVISIONING -> {
+ cancelProvisioningRequest()
+ state = ServerState.TERMINATED
+ }
ServerState.RUNNING, ServerState.ERROR -> {
val host = checkNotNull(host) { "Server not running" }
host.stop(this)
}
- ServerState.TERMINATED -> {} // No work needed
- ServerState.DELETED -> throw IllegalStateException("Server is terminated")
+ ServerState.TERMINATED, ServerState.DELETED -> {} // No work needed
}
}
override suspend fun delete() {
when (state) {
- ServerState.PROVISIONING -> {} // TODO Find way to interrupt these
- ServerState.RUNNING -> {
+ ServerState.PROVISIONING, ServerState.TERMINATED -> {
+ cancelProvisioningRequest()
+ service.delete(this)
+ state = ServerState.DELETED
+ }
+ ServerState.RUNNING, ServerState.ERROR -> {
val host = checkNotNull(host) { "Server not running" }
host.delete(this)
service.delete(this)
+ state = ServerState.DELETED
}
else -> {} // No work needed
}
@@ -121,11 +134,20 @@ internal class InternalServer(
field = value
}
- internal fun assignHost(host: Host) {
- this.host = host
+ /**
+ * Cancel the provisioning request if active.
+ */
+ private fun cancelProvisioningRequest() {
+ val request = request
+ if (request != null) {
+ this.request = null
+ request.isCancelled = true
+ }
}
- override fun equals(other: Any?): Boolean = other is InternalServer && uid == other.uid
+ override fun equals(other: Any?): Boolean = other is Server && uid == other.uid
override fun hashCode(): Int = uid.hashCode()
+
+ override fun toString(): String = "Server[uid=$uid,state=$state]"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt
index ed1dc662..2c953f8b 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt
@@ -20,14 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator.allocation
+package org.opendc.compute.service.scheduler
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.service.internal.HostView
-import org.opendc.compute.service.scheduler.AllocationPolicy
-
-private val logger = KotlinLogging.logger {}
/**
* Policy replaying VM-cluster assignment.
@@ -36,6 +33,8 @@ private val logger = KotlinLogging.logger {}
* assigned the VM image.
*/
public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String>) : AllocationPolicy {
+ private val logger = KotlinLogging.logger {}
+
override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic {
override fun select(
hypervisors: Set<HostView>,
diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
new file mode 100644
index 00000000..45a306aa
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -0,0 +1,390 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service
+
+import io.mockk.*
+import io.opentelemetry.api.metrics.MeterProvider
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNull
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.compute.api.*
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostListener
+import org.opendc.compute.service.driver.HostModel
+import org.opendc.compute.service.driver.HostState
+import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import java.util.*
+
+/**
+ * Test suite for the [ComputeService] interface.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class ComputeServiceTest {
+ lateinit var scope: TestCoroutineScope
+ lateinit var service: ComputeService
+
+ @BeforeEach
+ fun setUp() {
+ scope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(scope)
+ val policy = AvailableMemoryAllocationPolicy()
+ val meter = MeterProvider.noop().get("opendc-compute")
+ service = ComputeService(scope.coroutineContext, clock, meter, policy)
+ }
+
+ @AfterEach
+ fun tearDown() {
+ scope.cleanupTestCoroutines()
+ }
+
+ @Test
+ fun testClientClose() = scope.runBlockingTest {
+ val client = service.newClient()
+
+ assertEquals(emptyList<Flavor>(), client.queryFlavors())
+ assertEquals(emptyList<Image>(), client.queryImages())
+ assertEquals(emptyList<Server>(), client.queryServers())
+
+ client.close()
+
+ assertThrows<IllegalStateException> { client.queryFlavors() }
+ assertThrows<IllegalStateException> { client.queryImages() }
+ assertThrows<IllegalStateException> { client.queryServers() }
+
+ assertThrows<IllegalStateException> { client.findFlavor(UUID.randomUUID()) }
+ assertThrows<IllegalStateException> { client.findImage(UUID.randomUUID()) }
+ assertThrows<IllegalStateException> { client.findServer(UUID.randomUUID()) }
+
+ assertThrows<IllegalStateException> { client.newFlavor("test", 1, 2) }
+ assertThrows<IllegalStateException> { client.newImage("test") }
+ assertThrows<IllegalStateException> { client.newServer("test", mockk(), mockk()) }
+ }
+
+ @Test
+ fun testClientCreate() = scope.runBlockingTest {
+ val client = service.newClient()
+
+ val flavor = client.newFlavor("test", 1, 1024)
+ assertEquals(listOf(flavor), client.queryFlavors())
+ assertEquals(flavor, client.findFlavor(flavor.uid))
+ val image = client.newImage("test")
+ assertEquals(listOf(image), client.queryImages())
+ assertEquals(image, client.findImage(image.uid))
+ val server = client.newServer("test", image, flavor, start = false)
+ assertEquals(listOf(server), client.queryServers())
+ assertEquals(server, client.findServer(server.uid))
+
+ server.delete()
+ assertNull(client.findServer(server.uid))
+
+ image.delete()
+ assertNull(client.findImage(image.uid))
+
+ flavor.delete()
+ assertNull(client.findFlavor(flavor.uid))
+
+ assertThrows<IllegalStateException> { server.start() }
+ }
+
+ @Test
+ fun testClientOnClose() = scope.runBlockingTest {
+ service.close()
+ assertThrows<IllegalStateException> {
+ service.newClient()
+ }
+ }
+
+ @Test
+ fun testAddHost() = scope.runBlockingTest {
+ val host = mockk<Host>(relaxUnitFun = true)
+
+ every { host.model } returns HostModel(4, 2048)
+ every { host.state } returns HostState.UP
+
+ assertEquals(0, service.hostCount)
+ assertEquals(emptySet<Host>(), service.hosts)
+
+ service.addHost(host)
+
+ verify(exactly = 1) { host.addListener(any()) }
+
+ assertEquals(1, service.hostCount)
+ assertEquals(1, service.hosts.size)
+
+ service.removeHost(host)
+
+ verify(exactly = 1) { host.removeListener(any()) }
+ }
+
+ @Test
+ fun testAddHostDouble() = scope.runBlockingTest {
+ val host = mockk<Host>(relaxUnitFun = true)
+
+ every { host.model } returns HostModel(4, 2048)
+ every { host.state } returns HostState.DOWN
+
+ assertEquals(0, service.hostCount)
+ assertEquals(emptySet<Host>(), service.hosts)
+
+ service.addHost(host)
+ service.addHost(host)
+
+ verify(exactly = 1) { host.addListener(any()) }
+ }
+
+ @Test
+ fun testServerStartWithoutEnoughCpus() = scope.runBlockingTest {
+ val client = service.newClient()
+ val flavor = client.newFlavor("test", 1, 0)
+ val image = client.newImage("test")
+ val server = client.newServer("test", image, flavor, start = false)
+
+ server.start()
+ delay(5 * 60 * 1000)
+ server.refresh()
+ assertEquals(ServerState.ERROR, server.state)
+ }
+
+ @Test
+ fun testServerStartWithoutEnoughMemory() = scope.runBlockingTest {
+ val client = service.newClient()
+ val flavor = client.newFlavor("test", 0, 1024)
+ val image = client.newImage("test")
+ val server = client.newServer("test", image, flavor, start = false)
+
+ server.start()
+ delay(5 * 60 * 1000)
+ server.refresh()
+ assertEquals(ServerState.ERROR, server.state)
+ }
+
+ @Test
+ fun testServerStartWithoutEnoughResources() = scope.runBlockingTest {
+ val client = service.newClient()
+ val flavor = client.newFlavor("test", 1, 1024)
+ val image = client.newImage("test")
+ val server = client.newServer("test", image, flavor, start = false)
+
+ server.start()
+ delay(5 * 60 * 1000)
+ server.refresh()
+ assertEquals(ServerState.ERROR, server.state)
+ }
+
+ @Test
+ fun testServerCancelRequest() = scope.runBlockingTest {
+ val client = service.newClient()
+ val flavor = client.newFlavor("test", 1, 1024)
+ val image = client.newImage("test")
+ val server = client.newServer("test", image, flavor, start = false)
+
+ server.start()
+ server.stop()
+ delay(5 * 60 * 1000)
+ server.refresh()
+ assertEquals(ServerState.TERMINATED, server.state)
+ }
+
+ @Test
+ fun testServerCannotFitOnHost() = scope.runBlockingTest {
+ val host = mockk<Host>(relaxUnitFun = true)
+
+ every { host.model } returns HostModel(4, 2048)
+ every { host.state } returns HostState.UP
+ every { host.canFit(any()) } returns false
+
+ service.addHost(host)
+
+ val client = service.newClient()
+ val flavor = client.newFlavor("test", 1, 1024)
+ val image = client.newImage("test")
+ val server = client.newServer("test", image, flavor, start = false)
+
+ server.start()
+ delay(10 * 60 * 1000)
+ server.refresh()
+ assertEquals(ServerState.PROVISIONING, server.state)
+
+ verify { host.canFit(server) }
+ }
+
+ @Test
+ fun testHostAvailableAfterSomeTime() = scope.runBlockingTest {
+ val host = mockk<Host>(relaxUnitFun = true)
+ val listeners = mutableListOf<HostListener>()
+
+ every { host.uid } returns UUID.randomUUID()
+ every { host.model } returns HostModel(4, 2048)
+ every { host.state } returns HostState.DOWN
+ every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
+ every { host.canFit(any()) } returns false
+
+ service.addHost(host)
+
+ val client = service.newClient()
+ val flavor = client.newFlavor("test", 1, 1024)
+ val image = client.newImage("test")
+ val server = client.newServer("test", image, flavor, start = false)
+
+ server.start()
+ delay(5 * 60 * 1000)
+
+ listeners.forEach { it.onStateChanged(host, HostState.UP) }
+
+ delay(5 * 60 * 1000)
+ server.refresh()
+ assertEquals(ServerState.PROVISIONING, server.state)
+
+ verify { host.canFit(server) }
+ }
+
+ @Test
+ fun testHostUnavailableAfterSomeTime() = scope.runBlockingTest {
+ val host = mockk<Host>(relaxUnitFun = true)
+ val listeners = mutableListOf<HostListener>()
+
+ every { host.uid } returns UUID.randomUUID()
+ every { host.model } returns HostModel(4, 2048)
+ every { host.state } returns HostState.UP
+ every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
+ every { host.canFit(any()) } returns false
+
+ service.addHost(host)
+
+ val client = service.newClient()
+ val flavor = client.newFlavor("test", 1, 1024)
+ val image = client.newImage("test")
+ val server = client.newServer("test", image, flavor, start = false)
+
+ delay(5 * 60 * 1000)
+
+ listeners.forEach { it.onStateChanged(host, HostState.DOWN) }
+
+ server.start()
+ delay(5 * 60 * 1000)
+ server.refresh()
+ assertEquals(ServerState.PROVISIONING, server.state)
+
+ verify(exactly = 0) { host.canFit(server) }
+ }
+
+ @Test
+ fun testServerInvalidType() = scope.runBlockingTest {
+ val host = mockk<Host>(relaxUnitFun = true)
+ val listeners = mutableListOf<HostListener>()
+
+ every { host.uid } returns UUID.randomUUID()
+ every { host.model } returns HostModel(4, 2048)
+ every { host.state } returns HostState.UP
+ every { host.canFit(any()) } returns true
+ every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
+
+ service.addHost(host)
+
+ val client = service.newClient()
+ val flavor = client.newFlavor("test", 1, 1024)
+ val image = client.newImage("test")
+ val server = client.newServer("test", image, flavor, start = false)
+
+ assertThrows<IllegalArgumentException> {
+ listeners.forEach { it.onStateChanged(host, server, ServerState.RUNNING) }
+ }
+ }
+
+ @Test
+ fun testServerDeploy() = scope.runBlockingTest {
+ val host = mockk<Host>(relaxUnitFun = true)
+ val listeners = mutableListOf<HostListener>()
+
+ every { host.uid } returns UUID.randomUUID()
+ every { host.model } returns HostModel(4, 2048)
+ every { host.state } returns HostState.UP
+ every { host.canFit(any()) } returns true
+ every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
+
+ service.addHost(host)
+
+ val client = service.newClient()
+ val flavor = client.newFlavor("test", 1, 1024)
+ val image = client.newImage("test")
+ val server = client.newServer("test", image, flavor, start = false)
+ val slot = slot<Server>()
+
+ val watcher = mockk<ServerWatcher>(relaxUnitFun = true)
+ server.watch(watcher)
+
+ // Start server
+ server.start()
+ delay(5 * 60 * 1000)
+ coVerify { host.spawn(capture(slot), true) }
+
+ listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.RUNNING) }
+
+ server.refresh()
+ assertEquals(ServerState.RUNNING, server.state)
+
+ verify { watcher.onStateChanged(server, ServerState.RUNNING) }
+
+ // Stop server
+ listeners.forEach { it.onStateChanged(host, slot.captured, ServerState.TERMINATED) }
+
+ server.refresh()
+ assertEquals(ServerState.TERMINATED, server.state)
+
+ verify { watcher.onStateChanged(server, ServerState.TERMINATED) }
+ }
+
+ @Test
+ fun testServerDeployFailure() = scope.runBlockingTest {
+ val host = mockk<Host>(relaxUnitFun = true)
+ val listeners = mutableListOf<HostListener>()
+
+ every { host.uid } returns UUID.randomUUID()
+ every { host.model } returns HostModel(4, 2048)
+ every { host.state } returns HostState.UP
+ every { host.canFit(any()) } returns true
+ every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
+ coEvery { host.spawn(any(), true) } throws IllegalStateException()
+
+ service.addHost(host)
+
+ val client = service.newClient()
+ val flavor = client.newFlavor("test", 1, 1024)
+ val image = client.newImage("test")
+ val server = client.newServer("test", image, flavor, start = false)
+
+ server.start()
+ delay(5 * 60 * 1000)
+
+ server.refresh()
+ assertEquals(ServerState.PROVISIONING, server.state)
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt
new file mode 100644
index 00000000..18d698c6
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalFlavorTest.kt
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service
+
+import io.mockk.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotEquals
+import org.junit.jupiter.api.Test
+import org.opendc.compute.api.Flavor
+import org.opendc.compute.service.internal.ComputeServiceImpl
+import org.opendc.compute.service.internal.InternalFlavor
+import java.util.*
+
+/**
+ * Test suite for the [InternalFlavor] implementation.
+ */
+class InternalFlavorTest {
+ @Test
+ fun testEquality() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf())
+ val b = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf())
+
+ assertEquals(a, b)
+ }
+
+ @Test
+ fun testEqualityWithDifferentType() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf())
+
+ val b = mockk<Flavor>(relaxUnitFun = true)
+ every { b.uid } returns uid
+
+ assertEquals(a, b)
+ }
+
+ @Test
+ fun testInequalityWithDifferentType() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf())
+
+ val b = mockk<Flavor>(relaxUnitFun = true)
+ every { b.uid } returns UUID.randomUUID()
+
+ assertNotEquals(a, b)
+ }
+
+ @Test
+ fun testInequalityWithIncorrectType() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val a = InternalFlavor(service, uid, "test", 1, 1024, mutableMapOf(), mutableMapOf())
+
+ assertNotEquals(a, Unit)
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt
new file mode 100644
index 00000000..e1cb0128
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalImageTest.kt
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service
+
+import io.mockk.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotEquals
+import org.junit.jupiter.api.Test
+import org.opendc.compute.api.Image
+import org.opendc.compute.service.internal.ComputeServiceImpl
+import org.opendc.compute.service.internal.InternalFlavor
+import org.opendc.compute.service.internal.InternalImage
+import java.util.*
+
+/**
+ * Test suite for the [InternalFlavor] implementation.
+ */
+class InternalImageTest {
+ @Test
+ fun testEquality() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf())
+ val b = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf())
+
+ assertEquals(a, b)
+ }
+
+ @Test
+ fun testEqualityWithDifferentType() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf())
+
+ val b = mockk<Image>(relaxUnitFun = true)
+ every { b.uid } returns uid
+
+ assertEquals(a, b)
+ }
+
+ @Test
+ fun testInequalityWithDifferentType() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf())
+
+ val b = mockk<Image>(relaxUnitFun = true)
+ every { b.uid } returns UUID.randomUUID()
+
+ assertNotEquals(a, b)
+ }
+
+ @Test
+ fun testInequalityWithIncorrectType() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val a = InternalImage(service, uid, "test", mutableMapOf(), mutableMapOf())
+
+ assertNotEquals(a, Unit)
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
new file mode 100644
index 00000000..81cb45df
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/InternalServerTest.kt
@@ -0,0 +1,285 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service
+
+import io.mockk.*
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.test.runBlockingTest
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.internal.ComputeServiceImpl
+import org.opendc.compute.service.internal.InternalFlavor
+import org.opendc.compute.service.internal.InternalImage
+import org.opendc.compute.service.internal.InternalServer
+import java.util.*
+
+/**
+ * Test suite for the [InternalServer] implementation.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+class InternalServerTest {
+ @Test
+ fun testEquality() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+ val b = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ assertEquals(a, b)
+ }
+
+ @Test
+ fun testEqualityWithDifferentType() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ val b = mockk<Server>(relaxUnitFun = true)
+ every { b.uid } returns uid
+
+ assertEquals(a, b)
+ }
+
+ @Test
+ fun testInequalityWithDifferentType() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ val b = mockk<Server>(relaxUnitFun = true)
+ every { b.uid } returns UUID.randomUUID()
+
+ assertNotEquals(a, b)
+ }
+
+ @Test
+ fun testInequalityWithIncorrectType() {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val a = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ assertNotEquals(a, Unit)
+ }
+
+ @Test
+ fun testStartTerminatedServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ every { service.schedule(any()) } answers { ComputeServiceImpl.SchedulingRequest(it.invocation.args[0] as InternalServer) }
+
+ server.start()
+
+ verify(exactly = 1) { service.schedule(server) }
+ assertEquals(ServerState.PROVISIONING, server.state)
+ }
+
+ @Test
+ fun testStartDeletedServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ server.state = ServerState.DELETED
+
+ assertThrows<IllegalStateException> { server.start() }
+ }
+
+ @Test
+ fun testStartProvisioningServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ server.state = ServerState.PROVISIONING
+
+ server.start()
+
+ assertEquals(ServerState.PROVISIONING, server.state)
+ }
+
+ @Test
+ fun testStartRunningServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ server.state = ServerState.RUNNING
+
+ server.start()
+
+ assertEquals(ServerState.RUNNING, server.state)
+ }
+
+ @Test
+ fun testStopProvisioningServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+ val request = ComputeServiceImpl.SchedulingRequest(server)
+
+ every { service.schedule(any()) } returns request
+
+ server.start()
+ server.stop()
+
+ assertTrue(request.isCancelled)
+ assertEquals(ServerState.TERMINATED, server.state)
+ }
+
+ @Test
+ fun testStopTerminatedServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ server.state = ServerState.TERMINATED
+ server.stop()
+
+ assertEquals(ServerState.TERMINATED, server.state)
+ }
+
+ @Test
+ fun testStopDeletedServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ server.state = ServerState.DELETED
+ server.stop()
+
+ assertEquals(ServerState.DELETED, server.state)
+ }
+
+ @Test
+ fun testStopRunningServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>()
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+ val host = mockk<Host>(relaxUnitFun = true)
+
+ server.state = ServerState.RUNNING
+ server.host = host
+ server.stop()
+ yield()
+
+ coVerify { host.stop(server) }
+ }
+
+ @Test
+ fun testDeleteProvisioningServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+ val request = ComputeServiceImpl.SchedulingRequest(server)
+
+ every { service.schedule(any()) } returns request
+
+ server.start()
+ server.delete()
+
+ assertTrue(request.isCancelled)
+ assertEquals(ServerState.DELETED, server.state)
+ verify { service.delete(server) }
+ }
+
+ @Test
+ fun testDeleteTerminatedServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ server.state = ServerState.TERMINATED
+ server.delete()
+
+ assertEquals(ServerState.DELETED, server.state)
+
+ verify { service.delete(server) }
+ }
+
+ @Test
+ fun testDeleteDeletedServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+
+ server.state = ServerState.DELETED
+ server.delete()
+
+ assertEquals(ServerState.DELETED, server.state)
+ }
+
+ @Test
+ fun testDeleteRunningServer() = runBlockingTest {
+ val service = mockk<ComputeServiceImpl>(relaxUnitFun = true)
+ val uid = UUID.randomUUID()
+ val flavor = mockk<InternalFlavor>()
+ val image = mockk<InternalImage>()
+ val server = InternalServer(service, uid, "test", flavor, image, mutableMapOf(), mutableMapOf())
+ val host = mockk<Host>(relaxUnitFun = true)
+
+ server.state = ServerState.RUNNING
+ server.host = host
+ server.delete()
+ yield()
+
+ coVerify { host.delete(server) }
+ verify { service.delete(server) }
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt
new file mode 100644
index 00000000..db377914
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/AllocationPolicyTest.kt
@@ -0,0 +1,219 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler
+
+import io.mockk.every
+import io.mockk.mockk
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.Arguments
+import org.junit.jupiter.params.provider.MethodSource
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+import java.util.*
+import java.util.stream.Stream
+import kotlin.random.Random
+
+/**
+ * Test suite for the [AllocationPolicy] interface.
+ */
+internal class AllocationPolicyTest {
+ @ParameterizedTest
+ @MethodSource("activeServersArgs")
+ fun testActiveServersPolicy(
+ reversed: Boolean,
+ hosts: Set<HostView>,
+ server: Server,
+ expectedHost: HostView?
+ ) {
+ val policy = NumberOfActiveServersAllocationPolicy(reversed)
+ assertEquals(expectedHost, policy.invoke().select(hosts, server))
+ }
+
+ @ParameterizedTest
+ @MethodSource("availableMemoryArgs")
+ fun testAvailableMemoryPolicy(
+ reversed: Boolean,
+ hosts: Set<HostView>,
+ server: Server,
+ expectedHost: HostView?
+ ) {
+ val policy = AvailableMemoryAllocationPolicy(reversed)
+ assertEquals(expectedHost, policy.invoke().select(hosts, server))
+ }
+
+ @ParameterizedTest
+ @MethodSource("availableCoreMemoryArgs")
+ fun testAvailableCoreMemoryPolicy(
+ reversed: Boolean,
+ hosts: Set<HostView>,
+ server: Server,
+ expectedHost: HostView?
+ ) {
+ val policy = AvailableMemoryAllocationPolicy(reversed)
+ assertEquals(expectedHost, policy.invoke().select(hosts, server))
+ }
+
+ @ParameterizedTest
+ @MethodSource("provisionedCoresArgs")
+ fun testProvisionedPolicy(
+ reversed: Boolean,
+ hosts: Set<HostView>,
+ server: Server,
+ expectedHost: HostView?
+ ) {
+ val policy = ProvisionedCoresAllocationPolicy(reversed)
+ assertEquals(expectedHost, policy.invoke().select(hosts, server))
+ }
+
+ @Suppress("unused")
+ private companion object {
+ /**
+ * Test arguments for the [NumberOfActiveServersAllocationPolicy].
+ */
+ @JvmStatic
+ fun activeServersArgs(): Stream<Arguments> {
+ val random = Random(1)
+ val hosts = List(4) { i ->
+ val view = mockk<HostView>()
+ every { view.host.uid } returns UUID(0, i.toLong())
+ every { view.host.model.cpuCount } returns random.nextInt(1, 16)
+ every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024)
+ every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize)
+ every { view.numberOfActiveServers } returns random.nextInt(0, 6)
+ every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount)
+ every { view.toString() } returns "HostView[$i,numberOfActiveServers=${view.numberOfActiveServers}]"
+ view
+ }
+
+ val servers = List(2) {
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns random.nextInt(1, 8)
+ every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512)
+ server
+ }
+
+ return Stream.of(
+ Arguments.of(false, hosts.toSet(), servers[0], hosts[2]),
+ Arguments.of(false, hosts.toSet(), servers[1], hosts[1]),
+ Arguments.of(true, hosts.toSet(), servers[1], hosts[0]),
+ )
+ }
+
+ /**
+ * Test arguments for the [AvailableCoreMemoryAllocationPolicy].
+ */
+ @JvmStatic
+ fun availableCoreMemoryArgs(): Stream<Arguments> {
+ val random = Random(1)
+ val hosts = List(4) { i ->
+ val view = mockk<HostView>()
+ every { view.host.uid } returns UUID(0, i.toLong())
+ every { view.host.model.cpuCount } returns random.nextInt(1, 16)
+ every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024)
+ every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize)
+ every { view.numberOfActiveServers } returns random.nextInt(0, 6)
+ every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount)
+ every { view.toString() } returns "HostView[$i,availableMemory=${view.availableMemory}]"
+ view
+ }
+
+ val servers = List(2) {
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns random.nextInt(1, 8)
+ every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512)
+ server
+ }
+
+ return Stream.of(
+ Arguments.of(false, hosts.toSet(), servers[0], hosts[2]),
+ Arguments.of(false, hosts.toSet(), servers[1], hosts[2]),
+ Arguments.of(true, hosts.toSet(), servers[1], hosts[1]),
+ )
+ }
+
+ /**
+ * Test arguments for the [AvailableMemoryAllocationPolicy].
+ */
+ @JvmStatic
+ fun availableMemoryArgs(): Stream<Arguments> {
+ val random = Random(1)
+ val hosts = List(4) { i ->
+ val view = mockk<HostView>()
+ every { view.host.uid } returns UUID(0, i.toLong())
+ every { view.host.model.cpuCount } returns random.nextInt(1, 16)
+ every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024)
+ every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize)
+ every { view.numberOfActiveServers } returns random.nextInt(0, 6)
+ every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount)
+ every { view.toString() } returns "HostView[$i,availableMemory=${view.availableMemory}]"
+ view
+ }
+
+ val servers = List(2) {
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns random.nextInt(1, 8)
+ every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512)
+ server
+ }
+
+ return Stream.of(
+ Arguments.of(false, hosts.toSet(), servers[0], hosts[2]),
+ Arguments.of(false, hosts.toSet(), servers[1], hosts[2]),
+ Arguments.of(true, hosts.toSet(), servers[1], hosts[1]),
+ )
+ }
+
+ /**
+ * Test arguments for the [ProvisionedCoresAllocationPolicy].
+ */
+ @JvmStatic
+ fun provisionedCoresArgs(): Stream<Arguments> {
+ val random = Random(1)
+ val hosts = List(4) { i ->
+ val view = mockk<HostView>()
+ every { view.host.uid } returns UUID(0, i.toLong())
+ every { view.host.model.cpuCount } returns random.nextInt(1, 16)
+ every { view.host.model.memorySize } returns random.nextLong(1024, 1024 * 1024)
+ every { view.availableMemory } returns random.nextLong(0, view.host.model.memorySize)
+ every { view.numberOfActiveServers } returns random.nextInt(0, 6)
+ every { view.provisionedCores } returns random.nextInt(0, view.host.model.cpuCount)
+ every { view.toString() } returns "HostView[$i,provisionedCores=${view.provisionedCores}]"
+ view
+ }
+
+ val servers = List(2) {
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns random.nextInt(1, 8)
+ every { server.flavor.memorySize } returns random.nextLong(1024, 1024 * 512)
+ server
+ }
+
+ return Stream.of(
+ Arguments.of(false, hosts.toSet(), servers[0], hosts[2]),
+ Arguments.of(false, hosts.toSet(), servers[1], hosts[0]),
+ Arguments.of(true, hosts.toSet(), servers[1], hosts[0]),
+ )
+ }
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml b/simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml
new file mode 100644
index 00000000..0dfb75f2
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/test/resources/log4j2.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2021 AtLarge Research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN" packages="org.apache.logging.log4j.core">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.opendc" level="trace" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
index 1ad3f1c6..3bf8a114 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -38,5 +38,6 @@ dependencies {
implementation("io.github.microutils:kotlin-logging")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+ testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}")
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 3c4b4410..6d81aa7d 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,8 +22,9 @@
package org.opendc.compute.simulator
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.common.Labels
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.Flow
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
@@ -36,7 +37,6 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.MachinePowerModel
import org.opendc.simulator.failures.FailureDomain
-import org.opendc.utils.flow.EventFlow
import java.time.Clock
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -52,6 +52,7 @@ public class SimHost(
override val meta: Map<String, Any>,
context: CoroutineContext,
clock: Clock,
+ meter: Meter,
hypervisor: SimHypervisorProvider,
powerModel: MachinePowerModel = ConstantPowerModel(0.0),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
@@ -59,17 +60,13 @@ public class SimHost(
/**
* The [CoroutineScope] of the host bounded by the lifecycle of the host.
*/
- override val scope: CoroutineScope = CoroutineScope(context)
+ override val scope: CoroutineScope = CoroutineScope(context + Job())
/**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
- override val events: Flow<HostEvent>
- get() = _events
- internal val _events = EventFlow<HostEvent>()
-
/**
* The event listeners registered with this host.
*/
@@ -99,18 +96,13 @@ public class SimHost(
cpuUsage: Double,
cpuDemand: Double
) {
- _events.emit(
- HostEvent.SliceFinished(
- this@SimHost,
- requestedWork,
- grantedWork,
- overcommittedWork,
- interferedWork,
- cpuUsage,
- cpuDemand,
- guests.size
- )
- )
+ _cpuWork.record(requestedWork.toDouble())
+ _cpuWorkGranted.record(grantedWork.toDouble())
+ _cpuWorkOvercommit.record(overcommittedWork.toDouble())
+ _cpuWorkInterference.record(interferedWork.toDouble())
+ _cpuUsage.record(cpuUsage)
+ _cpuDemand.record(cpuDemand)
+ _cpuPower.record(machine.powerDraw.value)
}
}
)
@@ -132,6 +124,87 @@ public class SimHost(
override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum())
+ /**
+ * The number of guests on the host.
+ */
+ private val _guests = meter.longUpDownCounterBuilder("guests.total")
+ .setDescription("Number of guests")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The number of active guests on the host.
+ */
+ private val _activeGuests = meter.longUpDownCounterBuilder("guests.active")
+ .setDescription("Number of active guests")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The CPU usage on the host.
+ */
+ private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage")
+ .setDescription("The amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The CPU demand on the host.
+ */
+ private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand")
+ .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The requested work for the CPU.
+ */
+ private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage")
+ .setDescription("The amount of power used by the CPU")
+ .setUnit("W")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The requested work for the CPU.
+ */
+ private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total")
+ .setDescription("The amount of work supplied to the CPU")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work actually performed by the CPU.
+ */
+ private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted")
+ .setDescription("The amount of work performed by the CPU")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work that could not be performed by the CPU due to overcommitting resource.
+ */
+ private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit")
+ .setDescription("The amount of work not performed by the CPU due to overcommitment")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work that could not be performed by the CPU due to interference.
+ */
+ private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference")
+ .setDescription("The amount of work not performed by the CPU due to interference")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
init {
// Launch hypervisor onto machine
scope.launch {
@@ -166,12 +239,11 @@ public class SimHost(
require(canFit(server)) { "Server does not fit" }
val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel()))
guests[server] = guest
+ _guests.add(1)
if (start) {
guest.start()
}
-
- _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override fun contains(server: Server): Boolean {
@@ -191,6 +263,7 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
guest.terminate()
+ _guests.add(-1)
}
override fun addListener(listener: HostListener) {
@@ -207,6 +280,8 @@ public class SimHost(
_state = HostState.DOWN
}
+ override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
+
/**
* Convert flavor to machine model.
*/
@@ -226,6 +301,7 @@ public class SimHost(
}
}
+ _activeGuests.add(1)
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
}
@@ -236,9 +312,8 @@ public class SimHost(
}
}
+ _activeGuests.add(-1)
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
-
- _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override suspend fun fail() {
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index e311cd21..830fc868 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -22,12 +22,14 @@
package org.opendc.compute.simulator
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
+import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -37,7 +39,8 @@ import org.opendc.compute.api.Image
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.api.ServerWatcher
-import org.opendc.compute.service.driver.HostEvent
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostListener
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -45,23 +48,20 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import java.time.Clock
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.sdk.toOtelClock
import java.util.UUID
+import kotlin.coroutines.resume
/**
* Basic test-suite for the hypervisor.
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
- private lateinit var scope: TestCoroutineScope
- private lateinit var clock: Clock
private lateinit var machineModel: SimMachineModel
@BeforeEach
fun setUp() {
- scope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(scope)
-
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
@@ -74,72 +74,98 @@ internal class SimHostTest {
* Test overcommitting of resources by the hypervisor.
*/
@Test
- fun testOvercommitted() {
+ fun testOvercommitted() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
var requestedWork = 0L
var grantedWork = 0L
var overcommittedWork = 0L
- scope.launch {
- val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider())
- val duration = 5 * 60L
- val vmImageA = MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
- ),
- )
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider())
+ val duration = 5 * 60L
+ val vmImageA = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
+ ),
)
)
- val vmImageB = MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
- )
+ )
+ val vmImageB = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
)
)
)
+ )
- delay(5)
-
- val flavor = MockFlavor(2, 0)
- virtDriver.events
- .onEach { event ->
- when (event) {
- is HostEvent.SliceFinished -> {
- requestedWork += event.requestedBurst
- grantedWork += event.grantedBurst
- overcommittedWork += event.overcommissionedBurst
- }
- }
+ val flavor = MockFlavor(2, 0)
+
+ // Setup metric reader
+ val reader = CoroutineMetricReader(
+ this, listOf(meterProvider as MetricProducer),
+ object : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+ requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong()
+ grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong()
+ overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong()
+ return CompletableResultCode.ofSuccess()
}
- .launchIn(this)
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ },
+ exportInterval = duration * 1000
+ )
+
+ coroutineScope {
launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
+
+ suspendCancellableCoroutine<Unit> { cont ->
+ virtDriver.addListener(object : HostListener {
+ private var finished = 0
+
+ override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
+ if (newState == ServerState.TERMINATED && ++finished == 2) {
+ cont.resume(Unit)
+ }
+ }
+ })
+ }
}
- scope.advanceUntilIdle()
+ // Ensure last cycle is collected
+ delay(1000 * duration)
+ virtDriver.close()
+ reader.close()
assertAll(
- { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
{ assertEquals(4197600, requestedWork, "Requested work does not match") },
{ assertEquals(2157600, grantedWork, "Granted work does not match") },
{ assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
- { assertEquals(1200006, scope.currentTime) }
+ { assertEquals(1500001, currentTime) }
)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 2d0da1bf..b2d7cc30 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -47,4 +47,6 @@ dependencies {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
}
+
+ implementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 44436019..40f50235 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -22,26 +22,19 @@
package org.opendc.experiments.capelin
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.collect
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.flow.takeWhile
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostEvent
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.compute.simulator.SimHost
+import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
@@ -51,9 +44,11 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
import org.opendc.simulator.failures.FaultInjector
-import org.opendc.trace.core.EventTracer
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Clock
+import kotlin.coroutines.coroutineContext
+import kotlin.coroutines.resume
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
@@ -136,13 +131,13 @@ public fun createTraceReader(
/**
* Construct the environment for a simulated compute service..
*/
-public fun createComputeService(
- coroutineScope: CoroutineScope,
+public suspend fun withComputeService(
clock: Clock,
+ meterProvider: MeterProvider,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
- eventTracer: EventTracer
-): ComputeServiceImpl {
+ block: suspend CoroutineScope.(ComputeService) -> Unit
+): Unit = coroutineScope {
val hosts = environmentReader
.use { it.read() }
.map { def ->
@@ -151,33 +146,43 @@ public fun createComputeService(
def.name,
def.model,
def.meta,
- coroutineScope.coroutineContext,
+ coroutineContext,
clock,
+ meterProvider.get("opendc-compute-simulator"),
SimFairShareHypervisorProvider(),
def.powerModel
)
}
+ val schedulerMeter = meterProvider.get("opendc-compute")
val scheduler =
- ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl
+ ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy)
for (host in hosts) {
scheduler.addHost(host)
}
- return scheduler
+ try {
+ block(this, scheduler)
+ } finally {
+ scheduler.close()
+ hosts.forEach(SimHost::close)
+ }
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public fun attachMonitor(
- coroutineScope: CoroutineScope,
+public suspend fun withMonitor(
+ monitor: ExperimentMonitor,
clock: Clock,
+ metricProducer: MetricProducer,
scheduler: ComputeService,
- monitor: ExperimentMonitor
-) {
+ block: suspend CoroutineScope.() -> Unit
+): Unit = coroutineScope {
+ val monitorJobs = mutableSetOf<Job>()
+
// Monitor host events
for (host in scheduler.hosts) {
monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
@@ -186,45 +191,55 @@ public fun attachMonitor(
monitor.reportHostStateChange(clock.millis(), host, newState)
}
})
+ }
- host.events
- .onEach { event ->
- when (event) {
- is HostEvent.SliceFinished -> monitor.reportHostSlice(
- clock.millis(),
- event.requestedBurst,
- event.grantedBurst,
- event.overcommissionedBurst,
- event.interferedBurst,
- event.cpuUsage,
- event.cpuDemand,
- event.numberOfDeployedImages,
- event.driver
- )
- }
- }
- .launchIn(coroutineScope)
+ val reader = CoroutineMetricReader(
+ this,
+ listOf(metricProducer),
+ ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }),
+ exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */
+ )
- (host as SimHost).machine.powerDraw
- .onEach { monitor.reportPowerConsumption(host, it) }
- .launchIn(coroutineScope)
+ try {
+ block(this)
+ } finally {
+ monitorJobs.forEach(Job::cancel)
+ reader.close()
+ monitor.close()
}
+}
- scheduler.events
- .onEach { event ->
- when (event) {
- is ComputeServiceEvent.MetricsAvailable ->
- monitor.reportProvisionerMetrics(clock.millis(), event)
- }
- }
- .launchIn(coroutineScope)
+public class ComputeMetrics {
+ public var submittedVms: Int = 0
+ public var queuedVms: Int = 0
+ public var runningVms: Int = 0
+ public var unscheduledVms: Int = 0
+ public var finishedVms: Int = 0
+}
+
+/**
+ * Collect the metrics of the compute service.
+ */
+public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
+ val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
+ val res = ComputeMetrics()
+ try {
+ // Hack to extract metrics from OpenTelemetry SDK
+ res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Failed to collect metrics" }
+ }
+ return res
}
/**
* Process the trace.
*/
public suspend fun processTrace(
- coroutineScope: CoroutineScope,
clock: Clock,
reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
@@ -233,44 +248,46 @@ public suspend fun processTrace(
) {
val client = scheduler.newClient()
val image = client.newImage("vm-image")
+ var offset = Long.MIN_VALUE
try {
- var submitted = 0
+ coroutineScope {
+ while (reader.hasNext()) {
+ val entry = reader.next()
- while (reader.hasNext()) {
- val entry = reader.next()
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
- submitted++
- delay(max(0, entry.start - clock.millis()))
- coroutineScope.launch {
- chan.send(Unit)
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
+ delay(max(0, (entry.start - offset) - clock.millis()))
+ launch {
+ chan.send(Unit)
+ val server = client.newServer(
entry.name,
- entry.meta["cores"] as Int,
- entry.meta["required-memory"] as Long
- ),
- meta = entry.meta
- )
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta
+ )
- server.watch(object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor.reportVmStateChange(clock.millis(), server, newState)
+ suspendCancellableCoroutine { cont ->
+ server.watch(object : ServerWatcher {
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ monitor.reportVmStateChange(clock.millis(), server, newState)
+
+ if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) {
+ cont.resume(Unit)
+ }
+ }
+ })
}
- })
+ }
}
}
- scheduler.events
- .takeWhile {
- when (it) {
- is ComputeServiceEvent.MetricsAvailable ->
- it.inactiveVmCount + it.failedVmCount != submitted
- }
- }
- .collect()
- delay(1)
+ yield()
} finally {
reader.close()
client.close()
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index f9c96bb6..5fa77161 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -22,19 +22,15 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
-import org.opendc.compute.service.scheduler.AllocationPolicy
-import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
-import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
-import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
-import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
-import org.opendc.compute.service.scheduler.RandomAllocationPolicy
-import org.opendc.compute.simulator.allocation.*
+import org.opendc.compute.service.scheduler.*
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
@@ -47,7 +43,7 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import org.opendc.trace.core.EventTracer
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random
@@ -117,16 +113,19 @@ public abstract class Portfolio(name: String) : Experiment(name) {
* Perform a single trial for this portfolio.
*/
@OptIn(ExperimentalCoroutinesApi::class)
- override fun doRun(repeat: Int) {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val tracer = EventTracer(clock)
+ override fun doRun(repeat: Int): Unit = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val seeder = Random(repeat)
val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = createAllocationPolicy(seeder)
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
val workload = workload
val workloadNames = if (workload is CompositeWorkload) {
workload.workloads.map { it.name }
@@ -152,15 +151,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
4096
)
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environment,
- allocationPolicy,
- tracer
- )
-
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
@@ -175,31 +166,21 @@ public abstract class Portfolio(name: String) : Experiment(name) {
null
}
- attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- this,
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
-
- logger.debug("SUBMIT=${scheduler.submittedVms}")
- logger.debug("FAIL=${scheduler.unscheduledVms}")
- logger.debug("QUEUED=${scheduler.queuedVms}")
- logger.debug("RUNNING=${scheduler.runningVms}")
- logger.debug("FINISHED=${scheduler.finishedVms}")
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
failureDomain?.cancel()
- scheduler.close()
}
- try {
- testScope.advanceUntilIdle()
- } finally {
- monitor.close()
- }
+ val monitorResults = collectMetrics(meterProvider as MetricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
}
/**
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
new file mode 100644
index 00000000..799de60f
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.monitor
+
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import org.opendc.compute.service.driver.Host
+import java.time.Clock
+
+/**
+ * A [MetricExporter] that exports the metrics to the [ExperimentMonitor].
+ */
+public class ExperimentMetricExporter(
+ private val monitor: ExperimentMonitor,
+ private val clock: Clock,
+ private val hosts: Map<String, Host>
+) : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+ reportHostMetrics(metricsByName)
+ reportProvisionerMetrics(metricsByName)
+ return CompletableResultCode.ofSuccess()
+ }
+
+ private fun reportHostMetrics(metrics: Map<String, MetricData>) {
+ val hostMetrics = mutableMapOf<String, HostMetrics>()
+ hosts.mapValuesTo(hostMetrics) { HostMetrics() }
+
+ mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v ->
+ m.cpuDemand = v
+ }
+
+ mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v ->
+ m.cpuUsage = v
+ }
+
+ mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v ->
+ m.powerDraw = v
+ }
+
+ mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v ->
+ m.requestedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v ->
+ m.grantedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v ->
+ m.overcommissionedBurst = v.toLong()
+ }
+
+ mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
+ m.interferedBurst = v.toLong()
+ }
+
+ mapLongSum(metrics["guests.active"], hostMetrics) { m, v ->
+ m.numberOfDeployedImages = v.toInt()
+ }
+
+ for ((id, hostMetric) in hostMetrics) {
+ val host = hosts.getValue(id)
+ monitor.reportHostSlice(
+ clock.millis(),
+ hostMetric.requestedBurst,
+ hostMetric.grantedBurst,
+ hostMetric.overcommissionedBurst,
+ hostMetric.interferedBurst,
+ hostMetric.cpuUsage,
+ hostMetric.cpuDemand,
+ hostMetric.numberOfDeployedImages,
+ host
+ )
+
+ monitor.reportPowerConsumption(host, hostMetric.powerDraw)
+ }
+ }
+
+ private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSummaryData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.sum)
+ }
+ }
+ }
+
+ private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
+ val points = data?.longSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.labels["host"]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
+ private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
+ val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+
+ monitor.reportProvisionerMetrics(
+ clock.millis(),
+ hosts,
+ availableHosts,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ }
+
+ private class HostMetrics {
+ var requestedBurst: Long = 0
+ var grantedBurst: Long = 0
+ var overcommissionedBurst: Long = 0
+ var interferedBurst: Long = 0
+ var cpuUsage: Double = 0.0
+ var cpuDemand: Double = 0.0
+ var numberOfDeployedImages: Int = 0
+ var powerDraw: Double = 0.0
+ }
+
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 14cc06dc..5e75c890 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -24,15 +24,13 @@ package org.opendc.experiments.capelin.monitor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
-import java.io.Closeable
/**
* A monitor watches the events of an experiment.
*/
-public interface ExperimentMonitor : Closeable {
+public interface ExperimentMonitor : AutoCloseable {
/**
* This method is invoked when the state of a VM changes.
*/
@@ -68,5 +66,14 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked for a provisioner event.
*/
- public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {}
+ public fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index c9d57a98..0e675d87 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -25,7 +25,6 @@ package org.opendc.experiments.capelin.monitor
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.telemetry.HostEvent
@@ -172,17 +171,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
}
}
- override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
provisionerWriter.write(
ProvisionerEvent(
time,
- event.totalHostCount,
- event.availableHostCount,
- event.totalVmCount,
- event.activeVmCount,
- event.inactiveVmCount,
- event.waitingVmCount,
- event.failedVmCount
+ totalHostCount,
+ availableHostCount,
+ totalVmCount,
+ activeVmCount,
+ inactiveVmCount,
+ waitingVmCount,
+ failedVmCount
)
)
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index a812490a..02cfdc06 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,19 +22,18 @@
package org.opendc.experiments.capelin
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.AfterEach
+import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -45,9 +44,8 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import org.opendc.trace.core.EventTracer
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
-import java.time.Clock
/**
* An integration test suite for the SC20 experiments.
@@ -55,16 +53,6 @@ import java.time.Clock
@OptIn(ExperimentalCoroutinesApi::class)
class CapelinIntegrationTest {
/**
- * The [TestCoroutineScope] to use.
- */
- private lateinit var testScope: TestCoroutineScope
-
- /**
- * The simulation clock to use.
- */
- private lateinit var clock: Clock
-
- /**
* The monitor used to keep track of the metrics.
*/
private lateinit var monitor: TestExperimentReporter
@@ -74,38 +62,26 @@ class CapelinIntegrationTest {
*/
@BeforeEach
fun setUp() {
- testScope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(testScope)
-
monitor = TestExperimentReporter()
}
- /**
- * Tear down the experimental environment.
- */
- @AfterEach
- fun tearDown() = testScope.cleanupTestCoroutines()
-
@Test
- fun testLarge() {
+ fun testLarge() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val failures = false
val seed = 0
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var scheduler: ComputeServiceImpl
- val tracer = EventTracer(clock)
-
- testScope.launch {
- scheduler = createComputeService(
- this,
- clock,
- environmentReader,
- allocationPolicy,
- tracer
- )
+ lateinit var monitorResults: ComputeMetrics
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
val failureDomain = if (failures) {
println("ENABLING failures")
createFailureDomain(
@@ -120,29 +96,28 @@ class CapelinIntegrationTest {
null
}
- attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- this,
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
-
- println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
failureDomain?.cancel()
- scheduler.close()
- monitor.close()
}
- runSimulation()
+ monitorResults = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
+ { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
+ { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
+ { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
+ { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
{ assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
{ assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
{ assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
@@ -151,41 +126,33 @@ class CapelinIntegrationTest {
}
@Test
- fun testSmall() {
+ fun testSmall() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
val seed = 1
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
- val tracer = EventTracer(clock)
-
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environmentReader,
- allocationPolicy,
- tracer
- )
- attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- this,
- clock,
- traceReader,
- scheduler,
- chan,
- monitor
- )
-
- yield()
-
- println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
-
- scheduler.close()
- monitor.close()
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ traceReader,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
}
- runSimulation()
+ val metrics = collectMetrics(meterProvider as MetricProducer)
+ println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
// Note that these values have been verified beforehand
assertAll(
@@ -197,11 +164,6 @@ class CapelinIntegrationTest {
}
/**
- * Run the simulation.
- */
- private fun runSimulation() = testScope.advanceUntilIdle()
-
- /**
* Obtain the trace reader for the test.
*/
private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore b/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore
deleted file mode 100644
index ba64707c..00000000
--- a/simulator/opendc-experiments/opendc-experiments-sc18/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-input/
-output/
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
deleted file mode 100644
index 9e305b3d..00000000
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.sc18
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.test.TestCoroutineScope
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
-import org.opendc.compute.simulator.SimHost
-import org.opendc.format.environment.sc18.Sc18EnvironmentReader
-import org.opendc.format.trace.gwf.GwfTraceReader
-import org.opendc.harness.dsl.Experiment
-import org.opendc.harness.dsl.anyOf
-import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
-import org.opendc.simulator.utils.DelayControllerClockAdapter
-import org.opendc.trace.core.EventTracer
-import org.opendc.trace.core.enable
-import org.opendc.workflow.service.WorkflowEvent
-import org.opendc.workflow.service.WorkflowService
-import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
-import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
-import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
-import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
-import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
-import java.io.File
-import java.io.FileInputStream
-import kotlin.math.max
-
-/**
- * The [UnderspecificationExperiment] investigates the impact of scheduler underspecification on performance.
- * It focuses on components that must exist (that is, based on their own publications, the correct operation of the
- * schedulers under study requires these components), yet have been left underspecified by their author.
- */
-public class UnderspecificationExperiment : Experiment("underspecification") {
- /**
- * The workflow traces to test.
- */
- private val trace: String by anyOf("input/traces/chronos_exp_noscaler_ca.gwf")
-
- /**
- * The datacenter environments to test.
- */
- private val environment: String by anyOf("input/environments/base.json")
-
- @OptIn(ExperimentalCoroutinesApi::class)
- override fun doRun(repeat: Int) {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val tracer = EventTracer(clock)
- val recording = tracer.openRecording().run {
- enable<WorkflowEvent.JobSubmitted>()
- enable<WorkflowEvent.JobStarted>()
- enable<WorkflowEvent.JobFinished>()
- enable<WorkflowEvent.TaskStarted>()
- enable<WorkflowEvent.TaskFinished>()
- this
- }
-
- testScope.launch {
- launch { println("MAKESPAN: ${recording.workflowRuntime()}") }
- launch { println("WAIT: ${recording.workflowWaitingTime()}") }
- recording.start()
- }
-
- testScope.launch {
- val hosts = Sc18EnvironmentReader(FileInputStream(File(environment)))
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- testScope.coroutineContext,
- clock,
- SimSpaceSharedHypervisorProvider()
- )
- }
-
- val compute = ComputeService(
- testScope.coroutineContext,
- clock,
- tracer,
- NumberOfActiveServersAllocationPolicy(),
- )
-
- hosts.forEach { compute.addHost(it) }
-
- val scheduler = WorkflowService(
- testScope.coroutineContext,
- clock,
- tracer,
- compute.newClient(),
- mode = WorkflowSchedulerMode.Batch(100),
- jobAdmissionPolicy = NullJobAdmissionPolicy,
- jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
- taskEligibilityPolicy = NullTaskEligibilityPolicy,
- taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
- )
-
- val reader = GwfTraceReader(File(trace))
-
- while (reader.hasNext()) {
- val entry = reader.next()
- delay(max(0, entry.start * 1000 - clock.millis()))
- scheduler.submit(entry.workload)
- }
- }
-
- testScope.advanceUntilIdle()
- recording.close()
-
- // Check whether everything went okay
- testScope.uncaughtExceptions.forEach { it.printStackTrace() }
- assert(testScope.uncaughtExceptions.isEmpty()) { "Errors occurred during execution of the experiment" }
- }
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt
deleted file mode 100644
index a8356888..00000000
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.sc18
-
-import org.opendc.trace.core.EventStream
-import org.opendc.trace.core.onEvent
-import org.opendc.workflow.service.WorkflowEvent
-import java.util.*
-import kotlin.coroutines.resume
-import kotlin.coroutines.suspendCoroutine
-
-/**
- * This function collects the makespan of workflows that appear in the event stream.
- */
-public suspend fun EventStream.workflowRuntime(): Map<UUID, Long> = suspendCoroutine { cont ->
- val starts = mutableMapOf<UUID, Long>()
- val results = mutableMapOf<UUID, Long>()
-
- onEvent<WorkflowEvent.JobStarted> {
- starts[it.job.uid] = it.timestamp
- }
- onEvent<WorkflowEvent.JobFinished> {
- val start = starts.remove(it.job.uid) ?: return@onEvent
- results[it.job.uid] = it.timestamp - start
- }
- onClose { cont.resume(results) }
-}
-
-/**
- * This function collects the waiting time of workflows that appear in the event stream, which the duration between the
- * workflow submission and the start of the first task.
- */
-public suspend fun EventStream.workflowWaitingTime(): Map<UUID, Long> = suspendCoroutine { cont ->
- val starts = mutableMapOf<UUID, Long>()
- val results = mutableMapOf<UUID, Long>()
-
- onEvent<WorkflowEvent.JobStarted> {
- starts[it.job.uid] = it.timestamp
- }
- onEvent<WorkflowEvent.TaskStarted> {
- results.computeIfAbsent(it.job.uid) { _ ->
- val start = starts.remove(it.job.uid)!!
- it.timestamp - start
- }
- }
- onClose { cont.resume(results) }
-}
-
-/**
- * This function collects the response time of tasks that appear in the event stream.
- */
-public suspend fun EventStream.taskResponse(): Map<UUID, Long> = suspendCoroutine { cont ->
- val starts = mutableMapOf<UUID, Long>()
- val results = mutableMapOf<UUID, Long>()
-
- onEvent<WorkflowEvent.JobSubmitted> {
- for (task in it.job.tasks) {
- starts[task.uid] = it.timestamp
- }
- }
- onEvent<WorkflowEvent.TaskFinished> {
- val start = starts.remove(it.job.uid) ?: return@onEvent
- results[it.task.uid] = it.timestamp - start
- }
- onClose { cont.resume(results) }
-}
diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts
index d07fe7a6..fcc78a83 100644
--- a/simulator/opendc-runner-web/build.gradle.kts
+++ b/simulator/opendc-runner-web/build.gradle.kts
@@ -48,4 +48,6 @@ dependencies {
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}")
runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}")
+
+ implementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index b9aeecb8..5b717ff7 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -34,9 +34,12 @@ import com.mongodb.client.MongoClients
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.bson.Document
import org.bson.types.ObjectId
@@ -45,19 +48,14 @@ import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
import org.opendc.compute.service.scheduler.RandomAllocationPolicy
-import org.opendc.compute.simulator.allocation.*
-import org.opendc.experiments.capelin.attachMonitor
-import org.opendc.experiments.capelin.createComputeService
-import org.opendc.experiments.capelin.createFailureDomain
+import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.processTrace
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import org.opendc.trace.core.EventTracer
+import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
-import kotlin.coroutines.coroutineContext
import kotlin.random.Random
private val logger = KotlinLogging.logger {}
@@ -208,93 +206,85 @@ public class RunnerCli : CliktCommand(name = "runner") {
traceReader: Sc20RawParquetTraceReader,
performanceInterferenceReader: Sc20PerformanceInterferenceReader?
): WebExperimentMonitor.Result {
- val seed = repeat
- val traceDocument = scenario.get("trace", Document::class.java)
- val workloadName = traceDocument.getString("traceId")
- val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
-
- val seeder = Random(seed)
- val testScope = TestCoroutineScope(Job(parent = coroutineContext[Job]))
- val clock = DelayControllerClockAdapter(testScope)
-
- val chan = Channel<Unit>(Channel.CONFLATED)
-
- val operational = scenario.get("operational", Document::class.java)
- val allocationPolicy =
- when (val policyName = operational.getString("schedulerName")) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
- else -> throw IllegalArgumentException("Unknown policy $policyName")
- }
-
- val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(
- listOf(traceReader),
- performanceInterferenceModel,
- Workload(workloadName, workloadFraction),
- seed
- )
- val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
- val environment = TopologyParser(topologies, topologyId)
val monitor = WebExperimentMonitor()
- val tracer = EventTracer(clock)
-
- testScope.launch {
- val scheduler = createComputeService(
- this,
- clock,
- environment,
- allocationPolicy,
- tracer
- )
-
- val failureDomain = if (operational.getBoolean("failuresEnabled")) {
- logger.debug("ENABLING failures")
- createFailureDomain(
- testScope,
- clock,
- seeder.nextInt(),
- operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7,
- scheduler,
- chan
+
+ try {
+ runBlockingTest {
+ val seed = repeat
+ val traceDocument = scenario.get("trace", Document::class.java)
+ val workloadName = traceDocument.getString("traceId")
+ val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
+
+ val seeder = Random(seed)
+ val clock = DelayControllerClockAdapter(this)
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+ val metricProducer = meterProvider as MetricProducer
+
+ val operational = scenario.get("operational", Document::class.java)
+ val allocationPolicy =
+ when (val policyName = operational.getString("schedulerName")) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ else -> throw IllegalArgumentException("Unknown policy $policyName")
+ }
+
+ val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(
+ listOf(traceReader),
+ performanceInterferenceModel,
+ Workload(workloadName, workloadFraction),
+ seed
)
- } else {
- null
- }
+ val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
+ val environment = TopologyParser(topologies, topologyId)
+ val failureFrequency = operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7
+
+ withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler ->
+ val failureDomain = if (failureFrequency > 0) {
+ logger.debug { "ENABLING failures" }
+ createFailureDomain(
+ this,
+ clock,
+ seeder.nextInt(),
+ failureFrequency,
+ scheduler,
+ chan
+ )
+ } else {
+ null
+ }
- attachMonitor(this, clock, scheduler, monitor)
- processTrace(
- this,
- clock,
- trace,
- scheduler,
- chan,
- monitor
- )
-
- logger.debug("SUBMIT=${scheduler.submittedVms}")
- logger.debug("FAIL=${scheduler.unscheduledVms}")
- logger.debug("QUEUED=${scheduler.queuedVms}")
- logger.debug("RUNNING=${scheduler.runningVms}")
- logger.debug("FINISHED=${scheduler.finishedVms}")
-
- failureDomain?.cancel()
- scheduler.close()
- }
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
- try {
- testScope.advanceUntilIdle()
- testScope.uncaughtExceptions.forEach { it.printStackTrace() }
- } finally {
- monitor.close()
- testScope.cancel()
+ failureDomain?.cancel()
+ }
+
+ val monitorResults = collectMetrics(metricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Experiment failed" }
}
return monitor.getResult()
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
index a8ac6c10..fcd43ea7 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -25,7 +25,6 @@ package org.opendc.runner.web
import mu.KotlinLogging
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -205,13 +204,22 @@ public class WebExperimentMonitor : ExperimentMonitor {
private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
- override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
provisionerMetrics = AggregateProvisionerMetrics(
- max(event.totalVmCount, provisionerMetrics.vmTotalCount),
- max(event.waitingVmCount, provisionerMetrics.vmWaitingCount),
- max(event.activeVmCount, provisionerMetrics.vmActiveCount),
- max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount),
- max(event.failedVmCount, provisionerMetrics.vmFailedCount),
+ max(totalVmCount, provisionerMetrics.vmTotalCount),
+ max(waitingVmCount, provisionerMetrics.vmWaitingCount),
+ max(activeVmCount, provisionerMetrics.vmActiveCount),
+ max(inactiveVmCount, provisionerMetrics.vmInactiveCount),
+ max(failedVmCount, provisionerMetrics.vmFailedCount),
)
}
diff --git a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts
index bcb08be7..0221829a 100644
--- a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts
+++ b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts
@@ -32,7 +32,6 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
api(project(":opendc-serverless:opendc-serverless-api"))
- api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
implementation("io.github.microutils:kotlin-logging")
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 1c0f94fd..2127b066 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -108,8 +108,11 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
.launchIn(this)
launch {
- source.consume(consumer)
- job.cancel()
+ try {
+ source.consume(consumer)
+ } finally {
+ job.cancel()
+ }
}
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index f86c4198..830ff70e 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -71,6 +71,7 @@ public class SimBareMetalMachine(
override fun close() {
super.close()
+
scheduler.close()
scope.cancel()
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
index 937b6966..f2eea97c 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -71,7 +71,7 @@ class SimResourceBenchmarks {
fun benchmarkForwardOverhead(state: Workload) {
return scope.runBlockingTest {
val provider = SimResourceSource(4200.0, clock, scheduler)
- val forwarder = SimResourceTransformer()
+ val forwarder = SimResourceForwarder()
provider.startConsumer(forwarder)
return@runBlockingTest forwarder.consume(state.consumers[0])
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index 9705bd17..5c5ee038 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -54,8 +54,17 @@ public abstract class SimAbstractResourceContext(
override val remainingWork: Double
get() {
val activeCommand = activeCommand ?: return 0.0
- return computeRemainingWork(activeCommand, clock.millis())
+ val now = clock.millis()
+
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ computeRemainingWork(activeCommand, now).also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
}
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
/**
* A flag to indicate the state of the context.
@@ -201,6 +210,9 @@ public abstract class SimAbstractResourceContext(
// Flush may not be called when the resource consumer has finished
throw IllegalStateException()
}
+
+ // Flush remaining work cache
+ _remainingWorkFlush = Long.MIN_VALUE
} catch (cause: Throwable) {
doStop(cause)
} finally {
diff --git a/simulator/opendc-trace/build.gradle.kts b/simulator/opendc-telemetry/build.gradle.kts
index a1a751a2..7edfd134 100644
--- a/simulator/opendc-trace/build.gradle.kts
+++ b/simulator/opendc-telemetry/build.gradle.kts
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
diff --git a/simulator/opendc-trace/opendc-trace-core/build.gradle.kts b/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts
index 3051f733..d9a4b4dd 100644
--- a/simulator/opendc-trace/opendc-trace-core/build.gradle.kts
+++ b/simulator/opendc-telemetry/opendc-telemetry-api/build.gradle.kts
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-description = "Event tracing library for OpenDC"
+description = "Telemetry API for OpenDC"
/* Build configuration */
plugins {
@@ -29,5 +29,6 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api("org.jetbrains.kotlinx:kotlinx-coroutines-core")
+ api("io.opentelemetry:opentelemetry-api:${versions.otelApi}")
+ api("io.opentelemetry:opentelemetry-api-metrics:${versions.otelApiMetrics}")
}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts
index 02e77c7c..350a0f74 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
+++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,19 +20,19 @@
* SOFTWARE.
*/
-description = "Experiments for the SC18 article"
+description = "Telemetry SDK for OpenDC"
/* Build configuration */
plugins {
`kotlin-library-conventions`
- `experiment-conventions`
}
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-harness"))
- implementation(project(":opendc-format"))
- implementation(project(":opendc-workflow:opendc-workflow-service"))
- implementation(project(":opendc-simulator:opendc-simulator-core"))
- implementation(project(":opendc-compute:opendc-compute-simulator"))
+ api(project(":opendc-telemetry:opendc-telemetry-api"))
+ api("org.jetbrains.kotlinx:kotlinx-coroutines-core")
+ api("io.opentelemetry:opentelemetry-sdk:${versions.otelSdk}")
+ api("io.opentelemetry:opentelemetry-sdk-metrics:${versions.otelSdkMetrics}")
+
+ implementation("io.github.microutils:kotlin-logging")
}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt
index 1f4bb267..86f6647e 100644
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Event.kt
+++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,15 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.trace.core
+package org.opendc.telemetry.sdk
+
+import io.opentelemetry.sdk.common.Clock
/**
- * Base class for events reported by the OpenDC tracing library.
+ * An adapter class that bridges a [java.time.Clock] to a [Clock]
*/
-public abstract class Event(timestamp: Long = Long.MIN_VALUE) {
- /**
- * The timestamp at which the event has occurred.
- */
- public var timestamp: Long = timestamp
- internal set
+public class OtelClockAdapter(private val clock: java.time.Clock) : Clock {
+ override fun now(): Long = clock.millis()
+
+ override fun nanoTime(): Long = clock.millis() * 1_000_000L
}
+
+/**
+ * Convert the specified [java.time.Clock] to a [Clock].
+ */
+public fun java.time.Clock.toOtelClock(): Clock = OtelClockAdapter(this)
diff --git a/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
new file mode 100644
index 00000000..9ee16fac
--- /dev/null
+++ b/simulator/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.sdk.metrics.export
+
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.flow.consumeAsFlow
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import mu.KotlinLogging
+import java.util.*
+import kotlin.coroutines.resume
+import kotlin.coroutines.suspendCoroutine
+
+/**
+ * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every
+ * export interval.
+ *
+ * The reader runs in a [CoroutineScope] which enables collection of metrics in environments with a custom clock.
+ *
+ * @param scope The [CoroutineScope] to run the reader in.
+ * @param producers The metric producers to gather metrics from.
+ * @param exporter The export to export the metrics to.
+ * @param exportInterval The export interval in milliseconds.
+ */
+public class CoroutineMetricReader(
+ scope: CoroutineScope,
+ private val producers: List<MetricProducer>,
+ private val exporter: MetricExporter,
+ private val exportInterval: Long = 60_000
+) : AutoCloseable {
+ private val logger = KotlinLogging.logger {}
+ private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS)
+
+ /**
+ * The metric reader job.
+ */
+ private val readerJob = scope.launch {
+ while (isActive) {
+ delay(exportInterval)
+
+ val metrics = mutableListOf<MetricData>()
+ for (producer in producers) {
+ metrics.addAll(producer.collectAllMetrics())
+ }
+ chan.send(Collections.unmodifiableList(metrics))
+ }
+ }
+
+ /**
+ * The exporter job runs in the background to actually export the metrics.
+ */
+ private val exporterJob = chan.consumeAsFlow()
+ .onEach { metrics ->
+ suspendCoroutine<Unit> { cont ->
+ try {
+ val result = exporter.export(metrics)
+ result.whenComplete {
+ if (!result.isSuccess) {
+ logger.trace { "Exporter failed" }
+ }
+ cont.resume(Unit)
+ }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Exporter threw an Exception" }
+ cont.resume(Unit)
+ }
+ }
+ }
+ .launchIn(scope)
+
+ override fun close() {
+ readerJob.cancel()
+ exporterJob.cancel()
+ }
+}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt
deleted file mode 100644
index ac2b5e9b..00000000
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventStream.kt
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.core
-
-/**
- * A stream of [Event]s.
- */
-public interface EventStream : AutoCloseable {
- /**
- * Register the specified [action] to be performed on every event in the stream.
- */
- public fun onEvent(action: (Event) -> Unit)
-
- /**
- * Register the specified [action] to be performed on events of type [E].
- */
- public fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit)
-
- /**
- * Register the specified [action] to be performed on errors.
- */
- public fun onError(action: (Throwable) -> Unit)
-
- /**
- * Register the specified [action] to be performed when the stream is closed.
- */
- public fun onClose(action: Runnable)
-
- /**
- * Unregister the specified [action].
- *
- * @return `true` if an action was unregistered, `false` otherwise.
- */
- public fun remove(action: Any): Boolean
-
- /**
- * Start the processing of events in the current coroutine.
- *
- * @throws IllegalStateException if the stream was already started.
- */
- public suspend fun start()
-
- /**
- * Release all resources associated with this stream.
- *
- * @throws IllegalStateException if the stream was already stopped.
- */
- public override fun close()
-}
-
-/**
- * Register the specified [action] to be performed on events of type [E].
- */
-public inline fun <reified E : Event> EventStream.onEvent(noinline action: (E) -> Unit) {
- onEvent(E::class.java, action)
-}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt
deleted file mode 100644
index 4f978f4f..00000000
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/EventTracer.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.core
-
-import org.opendc.trace.core.internal.EventTracerImpl
-import java.time.Clock
-
-/**
- * An [EventTracer] is responsible for recording the events that occur in a system.
- */
-public interface EventTracer : AutoCloseable {
- /**
- * The [Clock] used to measure the timestamp and duration of the events.
- */
- public val clock: Clock
-
- /**
- * Determine whether the specified [Event] class is currently enabled in any of the active recordings.
- *
- * @return `true` if the event is enabled, `false` otherwise.
- */
- public fun isEnabled(type: Class<out Event>): Boolean
-
- /**
- * Commit the specified [event] to the appropriate event streams.
- */
- public fun commit(event: Event)
-
- /**
- * Create a new [RecordingStream] which is able to actively capture events emitted to the [EventTracer].
- */
- public fun openRecording(): RecordingStream
-
- /**
- * Terminate the lifecycle of the [EventTracer] and close its associated event streams.
- */
- public override fun close()
-
- public companion object {
- /**
- * Construct a new [EventTracer] instance.
- *
- * @param clock The [Clock] used to measure the timestamps.
- */
- @JvmName("create")
- public operator fun invoke(clock: Clock): EventTracer = EventTracerImpl(clock)
- }
-}
-
-/**
- * Determine whether the [Event] of type [E] is currently enabled in any of the active recordings.
- *
- * @return `true` if the event is enabled, `false` otherwise.
- */
-public inline fun <reified E : Event> EventTracer.isEnabled(): Boolean = isEnabled(E::class.java)
-
-/**
- * Lazily construct an [Event] of type [E] if it is enabled and commit it to the appropriate event streams.
- */
-public inline fun <reified E : Event> EventTracer.commit(block: () -> E) {
- if (isEnabled<E>()) {
- commit(block())
- }
-}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt
deleted file mode 100644
index 84dcc61a..00000000
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/Extensions.kt
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.core
-
-import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.awaitClose
-import kotlinx.coroutines.channels.sendBlocking
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.callbackFlow
-
-/**
- * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream.
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-public fun EventStream.asFlow(): Flow<Event> = callbackFlow {
- onEvent { sendBlocking(it) }
- onError { cancel(CancellationException("API error", it)) }
- onClose { channel.close() }
- awaitClose { this@asFlow.close() }
-}
-
-/**
- * Convert an [EventStream] to a [Flow] of [Event]s but do not start collection of the stream.
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-public fun EventStream.consumeAsFlow(): Flow<Event> = callbackFlow {
- onEvent { sendBlocking(it) }
- onError { cancel(CancellationException("API error", it)) }
- start()
-}
-
-/**
- * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream.
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-public inline fun <reified E : Event> EventStream.asTypedFlow(): Flow<E> = callbackFlow {
- onEvent<E> { sendBlocking(it) }
- onError { cancel(CancellationException("API error", it)) }
- onClose { channel.close() }
- awaitClose { this@asTypedFlow.close() }
-}
-
-/**
- * Convert an [EventStream] to a [Flow] of [Event] of type [E] but do not start collection of the stream.
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-public inline fun <reified E : Event> EventStream.consumeAsTypedFlow(): Flow<E> = callbackFlow {
- onEvent<E> { sendBlocking(it) }
- onError { cancel(CancellationException("API error", it)) }
- start()
-}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt
deleted file mode 100644
index f49e7c49..00000000
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/RecordingStream.kt
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.core
-
-/**
- * A recording stream that produces events from an [EventTracer].
- */
-public interface RecordingStream : EventStream {
- /**
- * Enable recording of the specified event [type].
- */
- public fun enable(type: Class<out Event>)
-
- /**
- * Disable recording of the specified event [type]
- */
- public fun disable(type: Class<out Event>)
-}
-
-/**
- * Enable recording of events of type [E].
- */
-public inline fun <reified E : Event> RecordingStream.enable() {
- enable(E::class.java)
-}
-
-/**
- * Disable recording of events of type [E].
- */
-public inline fun <reified E : Event> RecordingStream.disable() {
- enable(E::class.java)
-}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt
deleted file mode 100644
index fac99664..00000000
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/AbstractEventStream.kt
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.core.internal
-
-import kotlinx.coroutines.suspendCancellableCoroutine
-import org.opendc.trace.core.Event
-import org.opendc.trace.core.EventStream
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-
-/**
- * Base implementation of the [EventStream] implementation.
- */
-internal abstract class AbstractEventStream : EventStream {
- /**
- * The state of the stream.
- */
- protected var state = StreamState.Pending
-
- /**
- * The event actions to dispatch to.
- */
- private val eventActions = mutableListOf<EventDispatcher>()
-
- /**
- * The error actions to use.
- */
- private val errorActions = mutableListOf<(Throwable) -> Unit>()
-
- /**
- * The close actions to use.
- */
- private val closeActions = mutableListOf<Runnable>()
-
- /**
- * The continuation that is invoked when the stream closes.
- */
- private var cont: Continuation<Unit>? = null
-
- /**
- * Dispatch the specified [event] to this stream.
- */
- fun dispatch(event: Event) {
- val actions = eventActions
-
- // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type)
- for (action in actions) {
- if (!action.accepts(event)) {
- continue
- }
-
- try {
- action(event)
- } catch (e: Exception) {
- handleError(e)
- }
- }
- }
-
- /**
- * Handle the specified [throwable] that occurred while dispatching an event.
- */
- private fun handleError(throwable: Throwable) {
- val actions = errorActions
-
- // Default exception handler
- if (actions.isEmpty()) {
- throwable.printStackTrace()
- return
- }
-
- for (action in actions) {
- action(throwable)
- }
- }
-
- override fun onEvent(action: (Event) -> Unit) {
- eventActions += EventDispatcher(null, action)
- }
-
- override fun <E : Event> onEvent(type: Class<E>, action: (E) -> Unit) {
- @Suppress("UNCHECKED_CAST") // This cast must succeed
- eventActions += EventDispatcher(type, action as (Event) -> Unit)
- }
-
- override fun onError(action: (Throwable) -> Unit) {
- errorActions += action
- }
-
- override fun onClose(action: Runnable) {
- closeActions += action
- }
-
- override fun remove(action: Any): Boolean {
- return eventActions.removeIf { it.action == action } || errorActions.remove(action) || closeActions.remove(action)
- }
-
- override suspend fun start() {
- check(state == StreamState.Pending) { "Stream has already started/closed" }
-
- state = StreamState.Started
-
- return suspendCancellableCoroutine { cont -> this.cont = cont }
- }
-
- override fun close() {
- if (state == StreamState.Closed) {
- return
- }
-
- state = StreamState.Closed
- cont?.resume(Unit)
-
- val actions = closeActions
- for (action in actions) {
- action.run()
- }
- }
-}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt
deleted file mode 100644
index 8b6de75e..00000000
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/Dispatcher.kt
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.core.internal
-
-import org.opendc.trace.core.Event
-
-/**
- * The [Dispatcher] is responsible for dispatching events onto configured actions.
- */
-internal class Dispatcher {
- /**
- * The event actions to dispatch to.
- */
- private val eventActions = mutableListOf<EventDispatcher>()
-
- /**
- * The error actions to use.
- */
- private val errorActions = mutableListOf<(Throwable) -> Unit>()
-
- /**
- * Dispatch the specified [event].
- */
- fun dispatch(event: Event) {
- val actions = eventActions
-
- // TODO Opportunity for further optimizations if needed (e.g. dispatch based on event type)
- for (action in actions) {
- if (!action.accepts(event)) {
- continue
- }
-
- try {
- action(event)
- } catch (e: Exception) {
- handleError(e)
- }
- }
- }
-
- /**
- * Handle the specified [throwable] that occurred while dispatching an event.
- */
- private fun handleError(throwable: Throwable) {
- val actions = errorActions
-
- // Default exception handler
- if (actions.isEmpty()) {
- throwable.printStackTrace()
- return
- }
-
- for (action in actions) {
- action(throwable)
- }
- }
-}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt
deleted file mode 100644
index b2a662eb..00000000
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventDispatcher.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.core.internal
-
-import org.opendc.trace.core.Event
-
-/**
- * A dispatcher responsible for conditionally dispatching an event.
- */
-internal class EventDispatcher(val type: Class<out Event>?, val action: (Event) -> Unit) {
- /**
- * Determine whether this dispatcher accepts the specified event.
- */
- fun accepts(event: Event): Boolean {
- return type == null || type.isAssignableFrom(event.javaClass)
- }
-
- /**
- * Invoke the specified [event] on this action.
- */
- operator fun invoke(event: Event) {
- action(event)
- }
-}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt
deleted file mode 100644
index e85d0779..00000000
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/EventTracerImpl.kt
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.core.internal
-
-import org.opendc.trace.core.Event
-import org.opendc.trace.core.EventTracer
-import org.opendc.trace.core.RecordingStream
-import java.lang.reflect.Modifier
-import java.time.Clock
-import java.util.*
-
-/**
- * Default implementation of the [EventTracer] interface.
- */
-internal class EventTracerImpl(override val clock: Clock) : EventTracer {
- /**
- * The set of enabled events.
- */
- private val enabledEvents = IdentityHashMap<Class<out Event>, MutableList<Stream>>()
-
- /**
- * The event streams created by the tracer.
- */
- private val streams = WeakHashMap<Stream, Unit>()
-
- /**
- * A flag to indicate that the stream is closed.
- */
- private var isClosed: Boolean = false
-
- override fun isEnabled(type: Class<out Event>): Boolean = enabledEvents.containsKey(type)
-
- override fun commit(event: Event) {
- val type = event.javaClass
-
- // Assign timestamp if not set
- if (event.timestamp == Long.MIN_VALUE) {
- event.timestamp = clock.millis()
- }
-
- if (!isEnabled(type) || isClosed) {
- return
- }
-
- val streams = enabledEvents[type] ?: return
- for (stream in streams) {
- stream.dispatch(event)
- }
- }
-
- override fun openRecording(): RecordingStream = Stream()
-
- override fun close() {
- isClosed = true
-
- val streams = streams
- for ((stream, _) in streams) {
- stream.close()
- }
-
- enabledEvents.clear()
- }
-
- /**
- * Enable the specified [type] for the given [stream].
- */
- private fun enableFor(type: Class<out Event>, stream: Stream) {
- val res = enabledEvents.computeIfAbsent(type) { mutableListOf() }
- res.add(stream)
- }
-
- /**
- * Disable the specified [type] for the given [stream].
- */
- private fun disableFor(type: Class<out Event>, stream: Stream) {
- enabledEvents[type]?.remove(stream)
- }
-
- /**
- * The [RecordingStream] associated with this [EventTracer] implementation.
- */
- private inner class Stream : AbstractEventStream(), RecordingStream {
- /**
- * The set of enabled events for this stream.
- */
- private val enabledEvents = IdentityHashMap<Class<out Event>, Unit>()
-
- init {
- streams[this] = Unit
- }
-
- override fun enable(type: Class<out Event>) {
- validateEventClass(type)
-
- if (enabledEvents.put(type, Unit) == null && state == StreamState.Started) {
- enableFor(type, this)
- }
- }
-
- override fun disable(type: Class<out Event>) {
- validateEventClass(type)
-
- if (enabledEvents.remove(type) != null && state == StreamState.Started) {
- disableFor(type, this)
- }
- }
-
- override suspend fun start() {
- val enabledEvents = enabledEvents
- for ((event, _) in enabledEvents) {
- enableFor(event, this)
- }
-
- super.start()
- }
-
- override fun close() {
- val enabledEvents = enabledEvents
- for ((event, _) in enabledEvents) {
- disableFor(event, this)
- }
-
- // Remove this stream from the active streams
- streams.remove(this)
-
- super.close()
- }
-
- /**
- * Validate the specified event subclass.
- */
- private fun validateEventClass(type: Class<out Event>) {
- require(!Modifier.isAbstract(type.modifiers)) { "Abstract event classes are not allowed" }
- require(Event::class.java.isAssignableFrom(type)) { "Must be subclass to ${Event::class.qualifiedName}" }
- }
- }
-}
diff --git a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt b/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt
deleted file mode 100644
index 9f411e0d..00000000
--- a/simulator/opendc-trace/opendc-trace-core/src/main/kotlin/org/opendc/trace/core/internal/StreamState.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.core.internal
-
-/**
- * The state of a [Stream].
- */
-internal enum class StreamState {
- Pending, Started, Closed
-}
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
deleted file mode 100644
index 10f29f4e..00000000
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.utils.flow
-
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.channels.SendChannel
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.FlowCollector
-import kotlinx.coroutines.flow.consumeAsFlow
-
-/**
- * A [Flow] that can be used to emit events.
- */
-public interface EventFlow<T> : Flow<T> {
- /**
- * Emit the specified [event].
- */
- public fun emit(event: T)
-
- /**
- * Close the flow.
- */
- public fun close()
-}
-
-/**
- * Creates a new [EventFlow].
- */
-@Suppress("FunctionName")
-public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl()
-
-/**
- * Internal implementation of the [EventFlow] class.
- */
-@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
-private class EventFlowImpl<T> : EventFlow<T> {
- private var closed: Boolean = false
- private val subscribers = mutableListOf<SendChannel<T>>()
-
- override fun emit(event: T) {
- if (closed) {
- return
- }
-
- val it = subscribers.iterator()
- synchronized(this) {
- while (it.hasNext()) {
- val chan = it.next()
- if (chan.isClosedForSend) {
- it.remove()
- } else {
- chan.offer(event)
- }
- }
- }
- }
-
- override fun close() {
- synchronized(this) {
- closed = true
-
- for (chan in subscribers) {
- chan.close()
- }
-
- subscribers.clear()
- }
- }
-
- @InternalCoroutinesApi
- override suspend fun collect(collector: FlowCollector<T>) {
- val channel: Channel<T>
- synchronized(this) {
- if (closed) {
- return
- }
-
- channel = Channel(Channel.UNLIMITED)
- subscribers.add(channel)
- }
- try {
- channel.consumeAsFlow().collect(collector)
- } finally {
- channel.close()
- }
- }
-
- override fun toString(): String = "EventFlow"
-}
diff --git a/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts
index bec18ae9..040a9ff6 100644
--- a/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts
+++ b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts
@@ -33,13 +33,14 @@ dependencies {
api(platform(project(":opendc-platform")))
api(project(":opendc-workflow:opendc-workflow-api"))
api(project(":opendc-compute:opendc-compute-api"))
- api(project(":opendc-trace:opendc-trace-core"))
+ api(project(":opendc-telemetry:opendc-telemetry-api"))
implementation(project(":opendc-utils"))
implementation("io.github.microutils:kotlin-logging")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
testImplementation(project(":opendc-compute:opendc-compute-simulator"))
testImplementation(project(":opendc-format"))
+ testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}")
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl")
}
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt
deleted file mode 100644
index bb2ad6c6..00000000
--- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflow.service
-
-import org.opendc.trace.core.Event
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-
-/**
- * An event emitted by the [WorkflowService].
- */
-public sealed class WorkflowEvent : Event() {
- /**
- * The [WorkflowService] that emitted the event.
- */
- public abstract val service: WorkflowService
-
- /**
- * This event is emitted when a job was submitted to the scheduler.
- */
- public data class JobSubmitted(
- override val service: WorkflowService,
- public val job: Job
- ) : WorkflowEvent()
-
- /**
- * This event is emitted when a job has become active.
- */
- public data class JobStarted(
- override val service: WorkflowService,
- public val job: Job
- ) : WorkflowEvent()
-
- /**
- * This event is emitted when a job has finished processing.
- */
- public data class JobFinished(
- override val service: WorkflowService,
- public val job: Job
- ) : WorkflowEvent()
-
- /**
- * This event is emitted when a task of a job has started processing.
- */
- public data class TaskStarted(
- override val service: WorkflowService,
- public val job: Job,
- public val task: Task
- ) : WorkflowEvent()
-
- /**
- * This event is emitted when a task of a job has started processing.
- */
- public data class TaskFinished(
- override val service: WorkflowService,
- public val job: Job,
- public val task: Task
- ) : WorkflowEvent()
-}
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index 2f83e376..d3358ef1 100644
--- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,9 +22,8 @@
package org.opendc.workflow.service
-import kotlinx.coroutines.flow.Flow
+import io.opentelemetry.api.metrics.Meter
import org.opendc.compute.api.ComputeClient
-import org.opendc.trace.core.EventTracer
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
@@ -42,14 +41,14 @@ import kotlin.coroutines.CoroutineContext
*/
public interface WorkflowService : AutoCloseable {
/**
- * The events emitted by the workflow scheduler.
+ * Submit the specified [Job] to the workflow service for scheduling.
*/
- public val events: Flow<WorkflowEvent>
+ public suspend fun submit(job: Job)
/**
- * Submit the specified [Job] to the workflow service for scheduling.
+ * Run the specified [Job] and suspend execution until the job is finished.
*/
- public suspend fun submit(job: Job)
+ public suspend fun run(job: Job)
/**
* Terminate the lifecycle of the workflow service, stopping all running workflows.
@@ -63,6 +62,7 @@ public interface WorkflowService : AutoCloseable {
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
* @param tracer The event tracer to use.
+ * @param meter The meter to use.
* @param compute The compute client to use.
* @param mode The scheduling mode to use.
* @param jobAdmissionPolicy The job admission policy to use.
@@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- tracer: EventTracer,
+ meter: Meter,
compute: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable {
return WorkflowServiceImpl(
context,
clock,
- tracer,
+ meter,
compute,
mode,
jobAdmissionPolicy,
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 85a88acd..32191b8f 100644
--- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -22,17 +22,11 @@
package org.opendc.workflow.service.internal
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.flow.Flow
+import io.opentelemetry.api.metrics.Meter
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.map
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.compute.api.*
-import org.opendc.trace.core.EventTracer
-import org.opendc.trace.core.consumeAsFlow
-import org.opendc.trace.core.enable
import org.opendc.workflow.api.Job
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
import org.opendc.workflow.service.*
@@ -43,7 +37,9 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import java.time.Clock
import java.util.*
+import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.resume
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
@@ -52,7 +48,7 @@ import kotlin.coroutines.CoroutineContext
public class WorkflowServiceImpl(
context: CoroutineContext,
internal val clock: Clock,
- internal val tracer: EventTracer,
+ private val meter: Meter,
private val computeClient: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -63,7 +59,7 @@ public class WorkflowServiceImpl(
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- internal val scope = CoroutineScope(context)
+ internal val scope = CoroutineScope(context + Job())
/**
* The logger instance to use.
@@ -106,6 +102,11 @@ public class WorkflowServiceImpl(
internal val taskByServer = mutableMapOf<Server, TaskState>()
/**
+ * The continuation of the jobs.
+ */
+ private val conts = mutableMapOf<Job, Continuation<Unit>>()
+
+ /**
* The root listener of this scheduler.
*/
private val rootListener = object : WorkflowSchedulerListener {
@@ -151,6 +152,54 @@ public class WorkflowServiceImpl(
}
}
+ /**
+ * The number of jobs that have been submitted to the service.
+ */
+ private val submittedJobs = meter.longCounterBuilder("jobs.submitted")
+ .setDescription("Number of submitted jobs")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that are running.
+ */
+ private val runningJobs = meter.longUpDownCounterBuilder("jobs.active")
+ .setDescription("Number of jobs running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that have finished running.
+ */
+ private val finishedJobs = meter.longCounterBuilder("jobs.finished")
+ .setDescription("Number of jobs that finished running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of tasks that have been submitted to the service.
+ */
+ private val submittedTasks = meter.longCounterBuilder("tasks.submitted")
+ .setDescription("Number of submitted tasks")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that are running.
+ */
+ private val runningTasks = meter.longUpDownCounterBuilder("tasks.active")
+ .setDescription("Number of tasks running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that have finished running.
+ */
+ private val finishedTasks = meter.longCounterBuilder("tasks.finished")
+ .setDescription("Number of tasks that finished running")
+ .setUnit("1")
+ .build()
+
private val mode: WorkflowSchedulerMode.Logic
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
@@ -167,16 +216,7 @@ public class WorkflowServiceImpl(
}
}
- override val events: Flow<WorkflowEvent> = tracer.openRecording().let {
- it.enable<WorkflowEvent.JobSubmitted>()
- it.enable<WorkflowEvent.JobStarted>()
- it.enable<WorkflowEvent.JobFinished>()
- it.enable<WorkflowEvent.TaskStarted>()
- it.enable<WorkflowEvent.TaskFinished>()
- it.consumeAsFlow().map { event -> event as WorkflowEvent }
- }
-
- override suspend fun submit(job: Job) {
+ override suspend fun run(job: Job) {
// J1 Incoming Jobs
val jobInstance = JobState(job, clock.millis())
val instances = job.tasks.associateWith {
@@ -193,14 +233,24 @@ public class WorkflowServiceImpl(
if (instance.isRoot) {
instance.state = TaskStatus.READY
}
+
+ submittedTasks.add(1)
}
- instances.values.toCollection(jobInstance.tasks)
- incomingJobs += jobInstance
- rootListener.jobSubmitted(jobInstance)
- tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job))
+ return suspendCancellableCoroutine { cont ->
+ instances.values.toCollection(jobInstance.tasks)
+ incomingJobs += jobInstance
+ rootListener.jobSubmitted(jobInstance)
+ conts[job] = cont
+
+ submittedJobs.add(1)
+
+ requestCycle()
+ }
+ }
- requestCycle()
+ override suspend fun submit(job: Job) {
+ scope.launch { run(job) }
}
override fun close() {
@@ -231,12 +281,8 @@ public class WorkflowServiceImpl(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- tracer.commit(
- WorkflowEvent.JobStarted(
- this,
- jobInstance.job
- )
- )
+
+ runningJobs.add(1)
rootListener.jobStarted(jobInstance)
}
@@ -311,18 +357,11 @@ public class WorkflowServiceImpl(
public override fun onStateChanged(server: Server, newState: ServerState) {
when (newState) {
- ServerState.PROVISIONING -> {
- }
+ ServerState.PROVISIONING -> {}
ServerState.RUNNING -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
- tracer.commit(
- WorkflowEvent.TaskStarted(
- this@WorkflowServiceImpl,
- task.job.job,
- task.task
- )
- )
+ runningTasks.add(1)
rootListener.taskStarted(task)
}
ServerState.TERMINATED, ServerState.ERROR -> {
@@ -338,13 +377,9 @@ public class WorkflowServiceImpl(
task.finishedAt = clock.millis()
job.tasks.remove(task)
activeTasks -= task
- tracer.commit(
- WorkflowEvent.TaskFinished(
- this@WorkflowServiceImpl,
- task.job.job,
- task.task
- )
- )
+
+ runningTasks.add(-1)
+ finishedTasks.add(1)
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -371,8 +406,11 @@ public class WorkflowServiceImpl(
private fun finishJob(job: JobState) {
activeJobs -= job
- tracer.commit(WorkflowEvent.JobFinished(this, job.job))
+ runningJobs.add(-1)
+ finishedJobs.add(1)
rootListener.jobFinished(job)
+
+ conts.remove(job.job)?.resume(Unit)
}
public fun addListener(listener: WorkflowSchedulerListener) {
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt
deleted file mode 100644
index 2161f5f2..00000000
--- a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflow.service
-
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.collect
-import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertNotEquals
-import org.junit.jupiter.api.DisplayName
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
-import org.opendc.compute.simulator.SimHost
-import org.opendc.format.environment.sc18.Sc18EnvironmentReader
-import org.opendc.format.trace.gwf.GwfTraceReader
-import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
-import org.opendc.simulator.utils.DelayControllerClockAdapter
-import org.opendc.trace.core.EventTracer
-import org.opendc.workflow.service.internal.WorkflowServiceImpl
-import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
-import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
-import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
-import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
-import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
-import kotlin.math.max
-
-/**
- * Integration test suite for the [WorkflowServiceImpl].
- */
-@DisplayName("WorkflowServiceImpl")
-@OptIn(ExperimentalCoroutinesApi::class)
-internal class StageWorkflowSchedulerIntegrationTest {
- /**
- * A large integration test where we check whether all tasks in some trace are executed correctly.
- */
- @Test
- fun testTrace() {
- var jobsSubmitted = 0L
- var jobsStarted = 0L
- var jobsFinished = 0L
- var tasksStarted = 0L
- var tasksFinished = 0L
-
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
- val tracer = EventTracer(clock)
-
- val scheduler = let {
- val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- testScope.coroutineContext,
- clock,
- SimSpaceSharedHypervisorProvider()
- )
- }
-
- val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000)
-
- hosts.forEach { compute.addHost(it) }
-
- WorkflowService(
- testScope.coroutineContext,
- clock,
- tracer,
- compute.newClient(),
- mode = WorkflowSchedulerMode.Batch(100),
- jobAdmissionPolicy = NullJobAdmissionPolicy,
- jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
- taskEligibilityPolicy = NullTaskEligibilityPolicy,
- taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
- )
- }
-
- testScope.launch {
- scheduler.events
- .onEach { event ->
- when (event) {
- is WorkflowEvent.JobStarted -> jobsStarted++
- is WorkflowEvent.JobFinished -> jobsFinished++
- is WorkflowEvent.TaskStarted -> tasksStarted++
- is WorkflowEvent.TaskFinished -> tasksFinished++
- }
- }
- .collect()
- }
-
- testScope.launch {
- val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
-
- while (reader.hasNext()) {
- val entry = reader.next()
- jobsSubmitted++
- delay(max(0, entry.start - clock.millis()))
- scheduler.submit(entry.workload)
- }
- }
-
- testScope.advanceUntilIdle()
-
- assertAll(
- { assertEquals(emptyList<Throwable>(), testScope.uncaughtExceptions) },
- { assertNotEquals(0, jobsSubmitted, "No jobs submitted") },
- { assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") },
- { assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") },
- { assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") }
- )
- }
-}
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
new file mode 100644
index 00000000..46c0d835
--- /dev/null
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service
+
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.DisplayName
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.simulator.SimHost
+import org.opendc.format.environment.sc18.Sc18EnvironmentReader
+import org.opendc.format.trace.gwf.GwfTraceReader
+import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.telemetry.sdk.toOtelClock
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
+import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
+import kotlin.math.max
+
+/**
+ * Integration test suite for the [WorkflowServiceImpl].
+ */
+@DisplayName("WorkflowServiceImpl")
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class WorkflowServiceIntegrationTest {
+ /**
+ * A large integration test where we check whether all tasks in some trace are executed correctly.
+ */
+ @Test
+ fun testTrace() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
+ .use { it.read() }
+ .map { def ->
+ SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ coroutineContext,
+ clock,
+ MeterProvider.noop().get("opendc-compute-simulator"),
+ SimSpaceSharedHypervisorProvider()
+ )
+ }
+
+ val meter = MeterProvider.noop().get("opendc-compute")
+ val compute = ComputeService(coroutineContext, clock, meter, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000)
+
+ hosts.forEach { compute.addHost(it) }
+
+ val scheduler = WorkflowService(
+ coroutineContext,
+ clock,
+ meterProvider.get("opendc-workflow"),
+ compute.newClient(),
+ mode = WorkflowSchedulerMode.Batch(100),
+ jobAdmissionPolicy = NullJobAdmissionPolicy,
+ jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
+ taskEligibilityPolicy = NullTaskEligibilityPolicy,
+ taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
+ )
+
+ val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
+ var offset = Long.MIN_VALUE
+
+ coroutineScope {
+ while (reader.hasNext()) {
+ val entry = reader.next()
+
+ if (offset < 0) {
+ offset = entry.start - clock.millis()
+ }
+
+ delay(max(0, (entry.start - offset) - clock.millis()))
+ launch {
+ scheduler.run(entry.workload)
+ }
+ }
+ }
+
+ hosts.forEach(SimHost::close)
+ scheduler.close()
+ compute.close()
+
+ val metrics = collectMetrics(meterProvider as MetricProducer)
+
+ assertAll(
+ { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") },
+ { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") },
+ { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") },
+ { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") },
+ { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }
+ )
+ }
+
+ class WorkflowMetrics {
+ var jobsSubmitted = 0L
+ var jobsActive = 0L
+ var jobsFinished = 0L
+ var tasksSubmitted = 0L
+ var tasksActive = 0L
+ var tasksFinished = 0L
+ }
+
+ /**
+ * Collect the metrics of the workflow service.
+ */
+ private fun collectMetrics(metricProducer: MetricProducer): WorkflowMetrics {
+ val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
+ val res = WorkflowMetrics()
+ res.jobsSubmitted = metrics["jobs.submitted"]?.longSumData?.points?.last()?.value ?: 0
+ res.jobsActive = metrics["jobs.active"]?.longSumData?.points?.last()?.value ?: 0
+ res.jobsFinished = metrics["jobs.finished"]?.longSumData?.points?.last()?.value ?: 0
+ res.tasksSubmitted = metrics["tasks.submitted"]?.longSumData?.points?.last()?.value ?: 0
+ res.tasksActive = metrics["tasks.active"]?.longSumData?.points?.last()?.value ?: 0
+ res.tasksFinished = metrics["tasks.finished"]?.longSumData?.points?.last()?.value ?: 0
+ return res
+ }
+}
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml
index 70a0eacc..edcf41ed 100644
--- a/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml
@@ -28,6 +28,9 @@
</Console>
</Appenders>
<Loggers>
+ <Logger name="org.opendc.workflow" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
<Root level="warn">
<AppenderRef ref="Console"/>
</Root>
diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts
index d5603664..2e297997 100644
--- a/simulator/settings.gradle.kts
+++ b/simulator/settings.gradle.kts
@@ -31,13 +31,13 @@ include(":opendc-serverless:opendc-serverless-api")
include(":opendc-serverless:opendc-serverless-service")
include(":opendc-serverless:opendc-serverless-simulator")
include(":opendc-format")
-include(":opendc-experiments:opendc-experiments-sc18")
include(":opendc-experiments:opendc-experiments-capelin")
include(":opendc-runner-web")
include(":opendc-simulator:opendc-simulator-core")
include(":opendc-simulator:opendc-simulator-resources")
include(":opendc-simulator:opendc-simulator-compute")
include(":opendc-simulator:opendc-simulator-failures")
-include(":opendc-trace:opendc-trace-core")
+include(":opendc-telemetry:opendc-telemetry-api")
+include(":opendc-telemetry:opendc-telemetry-sdk")
include(":opendc-harness")
include(":opendc-utils")