summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildSrc/src/main/kotlin/library.kt2
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt99
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/StateFlow.kt (renamed from odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/signal/Signal.kt)37
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt7
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt9
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt53
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt10
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt53
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt9
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Metadata.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/PowerState.kt)18
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt19
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt)22
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt)30
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt220
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt3
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt58
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt67
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt)24
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt)2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt (renamed from opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt)145
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt14
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt25
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt12
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt11
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt158
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt12
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt4
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt26
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt15
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt (renamed from opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt)44
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt110
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt42
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FaultInjector.kt35
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt58
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt19
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt20
-rw-r--r--opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt51
-rw-r--r--opendc/opendc-experiments-sc20/build.gradle.kts2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt)23
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt84
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt15
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt9
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt8
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt3
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt38
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt76
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt9
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt2
-rw-r--r--opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt40
54 files changed, 1391 insertions, 547 deletions
diff --git a/buildSrc/src/main/kotlin/library.kt b/buildSrc/src/main/kotlin/library.kt
index 6333e351..3b05f3a4 100644
--- a/buildSrc/src/main/kotlin/library.kt
+++ b/buildSrc/src/main/kotlin/library.kt
@@ -45,5 +45,5 @@ object Library {
/**
* Kotlin coroutines support
*/
- val KOTLINX_COROUTINES = "1.3.4"
+ val KOTLINX_COROUTINES = "1.3.5"
}
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
new file mode 100644
index 00000000..5d9af9ec
--- /dev/null
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
@@ -0,0 +1,99 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.flow
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.SendChannel
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.FlowCollector
+import kotlinx.coroutines.flow.consumeAsFlow
+import java.util.WeakHashMap
+
+/**
+ * A [Flow] that can be used to emit events.
+ */
+public interface EventFlow<T> : Flow<T> {
+ /**
+ * Emit the specified [event].
+ */
+ public fun emit(event: T)
+
+ /**
+ * Close the flow.
+ */
+ public fun close()
+}
+
+/**
+ * Creates a new [EventFlow].
+ */
+@Suppress("FunctionName")
+public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl()
+
+/**
+ * Internal implementation of the [EventFlow] class.
+ */
+@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
+private class EventFlowImpl<T> : EventFlow<T> {
+ private var closed: Boolean = false
+ private val subscribers = WeakHashMap<SendChannel<T>, Unit>()
+
+ override fun emit(event: T) {
+ synchronized(this) {
+ for ((chan, _) in subscribers) {
+ chan.offer(event)
+ }
+ }
+ }
+
+ override fun close() {
+ synchronized(this) {
+ closed = true
+
+ for ((chan, _) in subscribers) {
+ chan.close()
+ }
+ }
+ }
+
+ @InternalCoroutinesApi
+ override suspend fun collect(collector: FlowCollector<T>) {
+ val channel: Channel<T>
+ synchronized(this) {
+ if (closed) {
+ return
+ }
+
+ channel = Channel(Channel.UNLIMITED)
+ subscribers[channel] = Unit
+ }
+ channel.consumeAsFlow().collect(collector)
+ }
+
+ override fun toString(): String = "EventFlow"
+}
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/signal/Signal.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/StateFlow.kt
index da6298a3..50add0ad 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/signal/Signal.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/StateFlow.kt
@@ -22,13 +22,13 @@
* SOFTWARE.
*/
-package com.atlarge.odcsim.signal
+package com.atlarge.odcsim.flow
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.channels.BroadcastChannel
-import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.asFlow
@@ -40,9 +40,9 @@ import kotlinx.coroutines.flow.asFlow
* in the future, but is not available yet.
* See: https://github.com/Kotlin/kotlinx.coroutines/pull/1354
*/
-public interface Signal<T> : Flow<T> {
+public interface StateFlow<T> : Flow<T> {
/**
- * The current value of this signal.
+ * The current value of this flow.
*
* Setting a value that is [equal][Any.equals] to the previous one does nothing.
*/
@@ -50,39 +50,30 @@ public interface Signal<T> : Flow<T> {
}
/**
- * Creates a [Signal] with a given initial [value].
+ * Creates a [StateFlow] with a given initial [value].
*/
@Suppress("FunctionName")
-public fun <T> Signal(value: T): Signal<T> = SignalImpl(value)
+public fun <T> StateFlow(value: T): StateFlow<T> = StateFlowImpl(value)
/**
- * Internal implementation of the [Signal] interface.
+ * Internal implementation of the [StateFlow] interface.
*/
-private class SignalImpl<T>(initialValue: T) : Signal<T> {
+@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
+private class StateFlowImpl<T>(initialValue: T) : StateFlow<T> {
/**
- * The [BroadcastChannel] to back this signal.
+ * The [BroadcastChannel] to back this flow.
*/
- @OptIn(ExperimentalCoroutinesApi::class)
- private val chan = BroadcastChannel<T>(Channel.CONFLATED)
+ private val chan = ConflatedBroadcastChannel(initialValue)
/**
- * The internal [Flow] backing this signal.
+ * The internal [Flow] backing this flow.
*/
- @OptIn(FlowPreview::class)
private val flow = chan.asFlow()
- init {
- @OptIn(ExperimentalCoroutinesApi::class)
- chan.offer(initialValue)
- }
-
- @OptIn(ExperimentalCoroutinesApi::class)
public override var value: T = initialValue
set(value) {
- if (field != value) {
- chan.offer(value)
- field = value
- }
+ chan.offer(value)
+ field = value
}
@InternalCoroutinesApi
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index 4edf94d2..934af293 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -37,6 +37,7 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
@@ -174,6 +175,10 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine
}
}
+ private val exceptionHandler = CoroutineExceptionHandler { _, exception ->
+ log.error("Uncaught exception", exception)
+ }
+
// SimulationContext
override val key: CoroutineContext.Key<*> = SimulationContext.Key
@@ -192,7 +197,7 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine
override val parent: Domain = parent ?: this
@InternalCoroutinesApi
- override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job
+ override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job + exceptionHandler
override fun toString(): String = path
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
index 86ec9a5b..01968cd8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
@@ -28,7 +28,7 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.resource.Resource
import com.atlarge.opendc.core.resource.TagContainer
import com.atlarge.opendc.core.services.ServiceRegistry
-import com.atlarge.opendc.core.services.ServiceRegistryImpl
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
@@ -68,7 +68,12 @@ public data class Server(
/**
* The services published by this server.
*/
- public val serviceRegistry: ServiceRegistry = ServiceRegistryImpl()
+ public val services: ServiceRegistry,
+
+ /**
+ * The events that are emitted by the server.
+ */
+ public val events: Flow<ServerEvent>
) : Resource {
override fun hashCode(): Int = uid.hashCode()
override fun equals(other: Any?): Boolean = other is Server && uid == other.uid
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt
new file mode 100644
index 00000000..1595937c
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt
@@ -0,0 +1,53 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.core
+
+import com.atlarge.opendc.core.services.ServiceKey
+
+/**
+ * An event that is emitted by a [Server].
+ */
+public sealed class ServerEvent {
+ /**
+ * The server that emitted the event.
+ */
+ public abstract val server: Server
+
+ /**
+ * This event is emitted when the state of [server] changes.
+ *
+ * @property server The server of which the state changed.
+ * @property previousState The previous state of the server.
+ */
+ public data class StateChanged(override val server: Server, val previousState: ServerState) : ServerEvent()
+
+ /**
+ * This event is emitted when a server publishes a service.
+ *
+ * @property server The server that published the service.
+ * @property key The service key of the service that was published.
+ */
+ public data class ServicePublished(override val server: Server, val key: ServiceKey<*>) : ServerEvent()
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
index b09a5a7d..e0a491c8 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt
@@ -27,10 +27,10 @@ package com.atlarge.opendc.compute.core.execution
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.core.services.AbstractServiceKey
+import com.atlarge.opendc.core.services.ServiceKey
/**
- * Represents the execution context in which an bootable [Image] runs on a [Server].
+ * Represents the execution context in which a bootable [Image] runs on a [Server].
*/
public interface ServerContext {
/**
@@ -44,11 +44,9 @@ public interface ServerContext {
public val cpus: List<ProcessingUnit>
/**
- * Publishes the given [service] with key [serviceKey] in the server's registry.
+ * Publish the specified [service] at the given [ServiceKey].
*/
- public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) {
- server.serviceRegistry[serviceKey] = service
- }
+ public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T)
/**
* Request the specified burst time from the processor cores and suspend execution until a processor core finishes
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt
new file mode 100644
index 00000000..e4da557b
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt
@@ -0,0 +1,53 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.core.execution
+
+import kotlinx.coroutines.CancellationException
+
+/**
+ * This exception is thrown by the underlying [ServerContext] to indicate that a shutdown flow
+ * has been sent to the server.
+ */
+public class ShutdownException(message: String? = null, override val cause: Throwable? = null) : CancellationException(message)
+
+/**
+ * This method terminates the current active coroutine if the specified [CancellationException] is caused
+ * by a shutdown.
+ */
+public fun CancellationException.assertShutdown() {
+ if (this is ShutdownException) {
+ throw this
+ }
+}
+
+/**
+ * This method terminates the current active coroutine if the specified [CancellationException] is caused
+ * by a failure.
+ */
+public fun CancellationException.assertFailure() {
+ if (this is ShutdownException && cause != null) {
+ throw this
+ }
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
index 107237ea..e77b55a6 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
@@ -26,7 +26,7 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
-import kotlinx.coroutines.isActive
+import kotlinx.coroutines.ensureActive
import java.util.UUID
import kotlin.coroutines.coroutineContext
import kotlin.math.min
@@ -64,11 +64,8 @@ data class FlopsApplicationImage(
val burst = LongArray(cores) { flops / cores }
val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization }
- while (coroutineContext.isActive) {
- if (burst.all { it == 0L }) {
- break
- }
-
+ while (burst.any { it != 0L }) {
+ coroutineContext.ensureActive()
ctx.run(burst, maxUsage, Long.MAX_VALUE)
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/PowerState.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Metadata.kt
index 5fce3f48..a3a851fe 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/PowerState.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Metadata.kt
@@ -24,17 +24,11 @@
package com.atlarge.opendc.compute.metal
-/**
- * The power state of a compute node.
+/*
+ * Common metadata keys for bare-metal nodes.
*/
-public enum class PowerState {
- /**
- * Node is powered on.
- */
- POWER_ON,
- /**
- * Node is powered off.
- */
- POWER_OFF,
-}
+/**
+ * The cluster to which the node belongs.
+ */
+const val NODE_CLUSTER = "bare-metal:cluster"
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index a43abfe9..7cb4c0c5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -27,12 +27,13 @@ package com.atlarge.opendc.compute.metal
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.Identity
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
* A bare-metal compute node.
*/
-data class Node(
+public data class Node(
/**
* The unique identifier of the node.
*/
@@ -44,9 +45,14 @@ data class Node(
public override val name: String,
/**
- * The power state of the node.
+ * Metadata of the node.
*/
- public val powerState: PowerState,
+ public val metadata: Map<String, Any>,
+
+ /**
+ * The last known state of the compute node.
+ */
+ public val state: NodeState,
/**
* The boot image of the node.
@@ -56,7 +62,12 @@ data class Node(
/**
* The server instance that is running on the node or `null` if no server is running.
*/
- public val server: Server?
+ public val server: Server?,
+
+ /**
+ * The events that are emitted by the node.
+ */
+ public val events: Flow<NodeEvent>
) : Identity {
override fun hashCode(): Int = uid.hashCode()
override fun equals(other: Any?): Boolean = other is Node && uid == other.uid
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt
index fbfd0ad6..7719db24 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt
@@ -22,20 +22,22 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.core.monitor
-
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
+package com.atlarge.opendc.compute.metal
/**
- * An interface for monitoring the state of a machine.
+ * An event that is emitted by a [Node].
*/
-public interface ServerMonitor {
+public sealed class NodeEvent {
+ /**
+ * The node that emitted the event.
+ */
+ public abstract val node: Node
+
/**
- * This method is invoked when the state of a machine updates.
+ * This event is emitted when the state of [node] changes.
*
- * @param server The server which state was updated.
- * @param previousState The previous state of the server.
+ * @property node The node of which the state changed.
+ * @property previousState The previous state of the node.
*/
- public suspend fun onUpdate(server: Server, previousState: ServerState)
+ public data class StateChanged(override val node: Node, val previousState: NodeState) : NodeEvent()
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt
index 3c77d57a..ca9cf509 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt
@@ -22,32 +22,34 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.workflows.monitor
-
-import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
+package com.atlarge.opendc.compute.metal
/**
- * An interface for monitoring the progression of workflows.
+ * An enumeration describing the possible states of a bare-metal compute node.
*/
-public interface WorkflowMonitor {
+public enum class NodeState {
+ /**
+ * The node is booting.
+ */
+ BOOT,
+
/**
- * This method is invoked when a job has become active.
+ * The node is powered off.
*/
- public suspend fun onJobStart(job: Job, time: Long)
+ SHUTOFF,
/**
- * This method is invoked when a job has finished processing.
+ * The node is active and running.
*/
- public suspend fun onJobFinish(job: Job, time: Long)
+ ACTIVE,
/**
- * This method is invoked when a task of a job has started processing.
+ * The node is in error.
*/
- public suspend fun onTaskStart(job: Job, task: Task, time: Long)
+ ERROR,
/**
- * This method is invoked when a task has finished processing.
+ * The state of the node is unknown.
*/
- public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long)
+ UNKNOWN,
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 1214dd36..41cec291 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -26,9 +26,8 @@ package com.atlarge.opendc.compute.metal.driver
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.PowerState
+import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.power.Powerable
import com.atlarge.opendc.core.services.AbstractServiceKey
import kotlinx.coroutines.flow.Flow
@@ -37,7 +36,12 @@ import java.util.UUID
/**
* A driver interface for the management interface of a bare-metal compute node.
*/
-public interface BareMetalDriver : Powerable {
+public interface BareMetalDriver : Powerable, FailureDomain {
+ /**
+ * The [Node] that is controlled by this driver.
+ */
+ public val node: Flow<Node>
+
/**
* The amount of work done by the machine in percentage with respect to the total amount of processing power
* available.
@@ -47,12 +51,22 @@ public interface BareMetalDriver : Powerable {
/**
* Initialize the driver.
*/
- public suspend fun init(monitor: ServerMonitor): Node
+ public suspend fun init(): Node
+
+ /**
+ * Start the bare metal node with the specified boot disk image.
+ */
+ public suspend fun start(): Node
+
+ /**
+ * Stop the bare metal node if it is running.
+ */
+ public suspend fun stop(): Node
/**
- * Update the power state of the compute node.
+ * Reboot the bare metal node.
*/
- public suspend fun setPower(powerState: PowerState): Node
+ public suspend fun reboot(): Node
/**
* Update the boot disk image of the compute node.
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index c7dc74cf..4a40dc9f 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -25,23 +25,31 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.signal.Signal
+import com.atlarge.odcsim.flow.EventFlow
+import com.atlarge.odcsim.flow.StateFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.MemoryUnit
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
+import com.atlarge.opendc.compute.core.execution.ShutdownException
+import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.EmptyImage
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.PowerState
+import com.atlarge.opendc.compute.metal.NodeEvent
+import com.atlarge.opendc.compute.metal.NodeState
import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
+import com.atlarge.opendc.core.services.ServiceKey
+import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
+import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
@@ -50,6 +58,7 @@ import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
import kotlinx.coroutines.withContext
+import java.lang.Exception
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
@@ -57,6 +66,7 @@ import kotlinx.coroutines.withContext
* @param domain The simulation domain the driver runs in.
* @param uid The unique identifier of the machine.
* @param name An optional name of the machine.
+ * @param metadata The initial metadata of the node.
* @param cpus The CPUs available to the bare metal machine.
* @param memoryUnits The memory units in this machine.
* @param powerModel The power model of this machine.
@@ -65,128 +75,175 @@ public class SimpleBareMetalDriver(
private val domain: Domain,
uid: UUID,
name: String,
+ metadata: Map<String, Any>,
val cpus: List<ProcessingUnit>,
val memoryUnits: List<MemoryUnit>,
- powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel(0.0)
+ powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel(
+ 0.0
+ )
) : BareMetalDriver {
/**
- * The monitor to use.
+ * The flavor that corresponds to this machine.
*/
- private lateinit var monitor: ServerMonitor
+ private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum())
/**
- * The machine state.
+ * The current active server context.
*/
- private var node: Node = Node(uid, name, PowerState.POWER_OFF, EmptyImage, null)
+ private var serverContext: BareMetalServerContext? = null
/**
- * The flavor that corresponds to this machine.
+ * The events of the machine.
*/
- private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum())
+ private val events = EventFlow<NodeEvent>()
/**
- * The job that is running the image.
+ * The flow containing the load of the server.
*/
- private var job: Job? = null
+ private val usageState = StateFlow(0.0)
/**
- * The signal containing the load of the server.
+ * The machine state.
*/
- private val usageSignal = Signal(0.0)
+ private val nodeState = StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events))
+
+ override val node: Flow<Node> = nodeState
- override val usage: Flow<Double> = usageSignal
+ override val usage: Flow<Double> = usageState
override val powerDraw: Flow<Double> = powerModel(this)
- override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) {
- this@SimpleBareMetalDriver.monitor = monitor
- return@withContext node
+ override suspend fun init(): Node = withContext(domain.coroutineContext) {
+ nodeState.value
}
- override suspend fun setPower(powerState: PowerState): Node = withContext(domain.coroutineContext) {
- val previousPowerState = node.powerState
- val server = when (node.powerState to powerState) {
- PowerState.POWER_OFF to PowerState.POWER_OFF -> null
- PowerState.POWER_OFF to PowerState.POWER_ON -> Server(
- UUID.randomUUID(),
- node.name,
- emptyMap(),
- flavor,
- node.image,
- ServerState.BUILD
- )
- PowerState.POWER_ON to PowerState.POWER_OFF -> null // TODO Terminate existing image
- PowerState.POWER_ON to PowerState.POWER_ON -> node.server
- else -> throw IllegalStateException()
+ override suspend fun start(): Node = withContext(domain.coroutineContext) {
+ val node = nodeState.value
+ if (node.state != NodeState.SHUTOFF) {
+ return@withContext node
}
- server?.serviceRegistry?.set(BareMetalDriver.Key, this@SimpleBareMetalDriver)
- node = node.copy(powerState = powerState, server = server)
- if (powerState != previousPowerState && server != null) {
- launch()
+ val events = EventFlow<ServerEvent>()
+ val server = Server(
+ UUID.randomUUID(),
+ node.name,
+ emptyMap(),
+ flavor,
+ node.image,
+ ServerState.BUILD,
+ ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver),
+ events
+ )
+
+ setNode(node.copy(state = NodeState.BOOT, server = server))
+ serverContext = BareMetalServerContext(events)
+ return@withContext nodeState.value
+ }
+
+ override suspend fun stop(): Node = withContext(domain.coroutineContext) {
+ val node = nodeState.value
+ if (node.state == NodeState.SHUTOFF) {
+ return@withContext node
}
+ // We terminate the image running on the machine
+ serverContext!!.cancel(fail = false)
+ serverContext = null
+
+ setNode(node.copy(state = NodeState.SHUTOFF, server = null))
return@withContext node
}
+ override suspend fun reboot(): Node = withContext(domain.coroutineContext) {
+ stop()
+ start()
+ }
+
override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
- node = node.copy(image = image)
- return@withContext node
+ setNode(nodeState.value.copy(image = image))
+ return@withContext nodeState.value
}
- override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node }
+ override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value }
- /**
- * Launch the server image on the machine.
- */
- private suspend fun launch() {
- val serverContext = serverCtx
+ private fun setNode(value: Node) {
+ val field = nodeState.value
+ if (field.state != value.state) {
+ events.emit(NodeEvent.StateChanged(value, field.state))
+ }
- job = domain.launch {
- serverContext.init()
- try {
- node.server!!.image(serverContext)
- serverContext.exit()
- } catch (cause: Throwable) {
- serverContext.exit(cause)
- }
+ if (field.server != null && value.server != null && field.server.state != value.server.state) {
+ serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state))
}
+
+ nodeState.value = value
}
- private val serverCtx = object : ServerManagementContext {
- private var initialized: Boolean = false
+ private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
+ private var finalized: Boolean = false
override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
- override var server: Server
- get() = node.server!!
- set(value) {
- node = node.copy(server = value)
+ override val server: Server
+ get() = nodeState.value.server!!
+
+ private val job = domain.launch {
+ delay(1) // TODO Introduce boot time
+ init()
+ try {
+ server.image(this@BareMetalServerContext)
+ exit()
+ } catch (cause: Throwable) {
+ exit(cause)
}
+ }
+
+ /**
+ * Cancel the image running on the machine.
+ */
+ suspend fun cancel(fail: Boolean) {
+ if (fail)
+ job.cancel(ShutdownException(cause = Exception("Random failure")))
+ else
+ job.cancel(ShutdownException())
+ job.join()
+ }
+
+ override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
+ val server = server.copy(services = server.services.put(key, service))
+ setNode(nodeState.value.copy(server = server))
+ events.emit(ServerEvent.ServicePublished(server, key))
+ }
override suspend fun init() {
- if (initialized) {
- throw IllegalStateException()
- }
+ assert(!finalized) { "Machine is already finalized" }
- val previousState = server.state
- server = server.copy(state = ServerState.ACTIVE)
- monitor.onUpdate(server, previousState)
- initialized = true
+ val server = server.copy(state = ServerState.ACTIVE)
+ setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server))
}
override suspend fun exit(cause: Throwable?) {
- val previousState = server.state
- val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR
- server = server.copy(state = state)
- initialized = false
- domain.launch { monitor.onUpdate(server, previousState) }
+ finalized = true
+
+ val newServerState =
+ if (cause == null || (cause is ShutdownException && cause.cause == null))
+ ServerState.SHUTOFF
+ else
+ ServerState.ERROR
+ val newNodeState =
+ if (cause == null || (cause is ShutdownException && cause.cause != null))
+ nodeState.value.state
+ else
+ NodeState.ERROR
+ val server = server.copy(state = newServerState)
+ setNode(nodeState.value.copy(state = newNodeState, server = server))
}
private var flush: Job? = null
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
require(burst.size == limit.size) { "Array dimensions do not match" }
+ assert(!finalized) { "Server instance is already finalized" }
// If run is called in at the same timestamp as the previous call, cancel the load flush
flush?.cancel()
@@ -209,19 +266,20 @@ public class SimpleBareMetalDriver(
}
}
- usageSignal.value = totalUsage / cpus.size
+ usageState.value = totalUsage / cpus.size
try {
delay(duration)
- } catch (_: CancellationException) {
- // On cancellation, we compute and return the remaining burst
+ } catch (e: CancellationException) {
+ // On non-failure cancellation, we compute and return the remaining burst
+ e.assertFailure()
}
val end = simulationContext.clock.millis()
// Flush the load if the do not receive a new run call for the same timestamp
- flush = domain.launch {
+ flush = domain.launch(job) {
delay(1)
- usageSignal.value = 0.0
+ usageState.value = 0.0
}
flush!!.invokeOnCompletion {
flush = null
@@ -235,4 +293,14 @@ public class SimpleBareMetalDriver(
}
}
}
+
+ override val scope: CoroutineScope
+ get() = domain
+
+ override suspend fun fail() {
+ serverContext?.cancel(fail = true)
+ domain.cancel()
+ }
+
+ override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})"
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
index 24ade799..105505f2 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.compute.metal.service
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import com.atlarge.opendc.core.services.AbstractServiceKey
@@ -53,7 +52,7 @@ public interface ProvisioningService {
/**
* Deploy the specified [Image] on a compute node.
*/
- public suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node
+ public suspend fun deploy(node: Node, image: Image): Node
/**
* The service key of this service.
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index b18a4006..a7e143aa 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -25,31 +25,22 @@
package com.atlarge.opendc.compute.metal.service
import com.atlarge.odcsim.Domain
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.PowerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
import kotlinx.coroutines.withContext
/**
* A very basic implementation of the [ProvisioningService].
*/
-public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, ServerMonitor {
+public class SimpleProvisioningService(val domain: Domain) : ProvisioningService {
/**
* The active nodes in this service.
*/
private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf()
- /**
- * The installed monitors.
- */
- private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf()
-
override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) {
- val node = driver.init(this@SimpleProvisioningService)
+ val node = driver.init()
nodes[node] = driver
return@withContext node
}
@@ -60,19 +51,10 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService
return@withContext nodes[node]!!.refresh()
}
- override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) {
+ override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) {
val driver = nodes[node]!!
-
driver.setImage(image)
- driver.setPower(PowerState.POWER_OFF)
- val newNode = driver.setPower(PowerState.POWER_ON)
- monitors[newNode.server!!] = monitor
+ val newNode = driver.reboot()
return@withContext newNode
}
-
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- withContext(domain.coroutineContext) {
- monitors[server]?.onUpdate(server, previousState)
- }
- }
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
new file mode 100644
index 00000000..69b0124d
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
@@ -0,0 +1,58 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.virt
+
+import com.atlarge.opendc.core.Identity
+import kotlinx.coroutines.flow.Flow
+import java.util.UUID
+
+/**
+ * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
+ * into several virtual guest machines.
+ */
+public class Hypervisor(
+ /**
+ * The unique identifier of the hypervisor.
+ */
+ override val uid: UUID,
+
+ /**
+ * The optional name of the hypervisor.
+ */
+ override val name: String,
+
+ /**
+ * Metadata of the hypervisor.
+ */
+ public val metadata: Map<String, Any>,
+
+ /**
+ * The events that are emitted by the hypervisor.
+ */
+ public val events: Flow<HypervisorEvent>
+) : Identity {
+ override fun hashCode(): Int = uid.hashCode()
+ override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
new file mode 100644
index 00000000..5c19b00d
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
@@ -0,0 +1,67 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.virt
+
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+
+/**
+ * An event that is emitted by a [VirtDriver].
+ */
+public sealed class HypervisorEvent {
+ /**
+ * The driver that emitted the event.
+ */
+ public abstract val driver: VirtDriver
+
+ /**
+ * This event is emitted when the number of active servers on the server managed by this driver is updated.
+ *
+ * @property driver The driver that emitted the event.
+ * @property numberOfActiveServers The number of active servers.
+ * @property availableMemory The available memory, in MB.
+ */
+ public data class VmsUpdated(
+ override val driver: VirtDriver,
+ public val numberOfActiveServers: Int,
+ public val availableMemory: Long
+ ) : HypervisorEvent()
+
+ /**
+ * This event is emitted when a slice is finished.
+ *
+ * @property driver The driver that emitted the event.
+ * @property requestedBurst The total requested CPU time (can be above capacity).
+ * @property grantedBurst The actual total granted capacity.
+ * @property numberOfDeployedImages The number of images deployed on this hypervisor.
+ */
+ public data class SliceFinished(
+ override val driver: VirtDriver,
+ public val requestedBurst: Long,
+ public val grantedBurst: Long,
+ public val numberOfDeployedImages: Int,
+ public val hostServer: Server
+ ) : HypervisorEvent()
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index 8d055953..c21b002d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -22,32 +22,36 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import com.atlarge.opendc.core.resource.TagContainer
+import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.UUID
/**
* A hypervisor managing the VMs of a node.
*/
-class HypervisorImage(
- private val hypervisorMonitor: HypervisorMonitor
-) : Image {
+object HypervisorImage : Image {
override val uid: UUID = UUID.randomUUID()
override val name: String = "vmm"
override val tags: TagContainer = emptyMap()
override suspend fun invoke(ctx: ServerContext) {
- val driver = HypervisorVirtDriver(ctx, hypervisorMonitor)
+ coroutineScope {
+ val driver = SimpleVirtDriver(ctx, this)
+ ctx.publishService(VirtDriver.Key, driver)
- ctx.publishService(VirtDriver.Key, driver)
-
- // Suspend image until it is cancelled
- suspendCancellableCoroutine<Unit> {}
+ // Suspend image until it is cancelled
+ try {
+ suspendCancellableCoroutine<Unit> {}
+ } finally {
+ driver.eventFlow.close()
+ }
+ }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt
index 926234b5..0586ae00 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt
@@ -1,3 +1,3 @@
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt.driver
public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.")
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 430e5a37..76368080 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -22,26 +22,34 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt.driver
-import com.atlarge.odcsim.SimulationContext
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
+import com.atlarge.opendc.compute.core.execution.ShutdownException
+import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import com.atlarge.opendc.compute.virt.HypervisorEvent
+import com.atlarge.opendc.core.services.ServiceKey
+import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import java.util.UUID
import kotlin.math.ceil
@@ -51,11 +59,18 @@ import kotlin.math.min
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
*/
-class HypervisorVirtDriver(
+@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
+class SimpleVirtDriver(
private val hostContext: ServerContext,
- private val monitor: HypervisorMonitor
+ private val coroutineScope: CoroutineScope
) : VirtDriver {
/**
+ * The [Server] on which this hypervisor runs.
+ */
+ private val server: Server
+ get() = hostContext.server
+
+ /**
* A set for tracking the VM context objects.
*/
internal val vms: MutableSet<VmServerContext> = mutableSetOf()
@@ -66,31 +81,38 @@ class HypervisorVirtDriver(
private var availableMemory: Long = hostContext.server.flavor.memorySize
/**
- * Monitors to keep informed.
+ * The [EventFlow] to emit the events.
*/
- private val monitors: MutableSet<VirtDriverMonitor> = mutableSetOf()
+ internal val eventFlow = EventFlow<HypervisorEvent>()
- override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server {
+ override val events: Flow<HypervisorEvent> = eventFlow
+
+ override suspend fun spawn(
+ name: String,
+ image: Image,
+ flavor: Flavor
+ ): Server {
val requiredMemory = flavor.memorySize
if (availableMemory - requiredMemory < 0) {
throw InsufficientMemoryOnServerException()
}
require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" }
- val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD)
+ val events = EventFlow<ServerEvent>()
+ val server = Server(
+ UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD,
+ ServiceRegistry(), events
+ )
availableMemory -= requiredMemory
- vms.add(VmServerContext(server, monitor, simulationContext))
- monitors.forEach { it.onUpdate(vms.size, availableMemory) }
+ vms.add(VmServerContext(server, events, simulationContext.domain))
+ eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
}
- override suspend fun addMonitor(monitor: VirtDriverMonitor) {
- monitors.add(monitor)
- }
-
- override suspend fun removeMonitor(monitor: VirtDriverMonitor) {
- monitors.remove(monitor)
- }
+ /**
+ * A flag to indicate the driver is stopped.
+ */
+ private var stopped: Boolean = false
/**
* The set of [VmServerContext] instances that is being scheduled at the moment.
@@ -108,12 +130,12 @@ class HypervisorVirtDriver(
private suspend fun reschedule() {
flush()
- // Do not schedule a call if there is no work to schedule
- if (activeVms.isEmpty()) {
+ // Do not schedule a call if there is no work to schedule or the driver stopped.
+ if (stopped || activeVms.isEmpty()) {
return
}
- val call = simulationContext.domain.launch {
+ val call = coroutineScope.launch {
val start = simulationContext.clock.millis()
val vms = activeVms.toSet()
@@ -200,16 +222,9 @@ class HypervisorVirtDriver(
}
}
- monitor.onSliceFinish(
- end,
- totalBurst,
- totalBurst - totalRemainder,
- vms.size,
- hostContext.server
- )
+ eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size, server))
}
this.call = call
- call.invokeOnCompletion { this.call = null }
}
/**
@@ -217,9 +232,10 @@ class HypervisorVirtDriver(
*/
private fun flush() {
val call = call ?: return // If there is no active call, there is nothing to flush
- // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its
+ // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its
// completion.
call.cancel()
+ this.call = null
}
/**
@@ -238,17 +254,19 @@ class HypervisorVirtDriver(
}
internal inner class VmServerContext(
- override var server: Server,
- val monitor: ServerMonitor,
- ctx: SimulationContext
+ server: Server,
+ val events: EventFlow<ServerEvent>,
+ val domain: Domain
) : ServerManagementContext {
+ private var finalized: Boolean = false
lateinit var requests: List<CpuRequest>
lateinit var burst: LongArray
var deadline: Long = 0L
var chan = Channel<Unit>(Channel.RENDEZVOUS)
private var initialized: Boolean = false
- internal val job: Job = ctx.domain.launch {
+ internal val job: Job = coroutineScope.launch {
+ delay(1) // TODO Introduce boot time
init()
try {
server.image(this@VmServerContext)
@@ -258,28 +276,42 @@ class HypervisorVirtDriver(
}
}
+ override var server: Server = server
+ set(value) {
+ if (field.state != value.state) {
+ events.emit(ServerEvent.StateChanged(value, field.state))
+ }
+
+ field = value
+ }
+
override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
+ override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) {
+ server = server.copy(services = server.services.put(key, service))
+ events.emit(ServerEvent.ServicePublished(server, key))
+ }
+
override suspend fun init() {
- if (initialized) {
- throw IllegalStateException()
- }
+ assert(!finalized) { "VM is already finalized" }
- val previousState = server.state
server = server.copy(state = ServerState.ACTIVE)
- monitor.onUpdate(server, previousState)
initialized = true
}
override suspend fun exit(cause: Throwable?) {
- val previousState = server.state
- val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR
- server = server.copy(state = state)
+ finalized = true
+
+ val serverState =
+ if (cause == null || (cause is ShutdownException && cause.cause == null))
+ ServerState.SHUTOFF
+ else
+ ServerState.ERROR
+ server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
- monitor.onUpdate(server, previousState)
- initialized = false
vms.remove(this)
- monitors.forEach { it.onUpdate(vms.size, availableMemory) }
+ events.close()
+ eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
}
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
@@ -289,7 +321,14 @@ class HypervisorVirtDriver(
this.burst = burst
requests = cpus.asSequence()
.take(burst.size)
- .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) }
+ .mapIndexed { i, cpu ->
+ CpuRequest(
+ this,
+ cpu,
+ burst[i],
+ limit[i]
+ )
+ }
.toList()
// Wait until the burst has been run or the coroutine is cancelled
@@ -297,11 +336,13 @@ class HypervisorVirtDriver(
activeVms += this
reschedule()
chan.receive()
- } catch (_: CancellationException) {
+ } catch (e: CancellationException) {
// On cancellation, we compute and return the remaining burst
+ e.assertFailure()
+ } finally {
+ activeVms -= this
+ reschedule()
}
- activeVms -= this
- reschedule()
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index d889d0f9..1002d382 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -27,8 +27,9 @@ package com.atlarge.opendc.compute.virt.driver
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.AbstractServiceKey
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
@@ -37,28 +38,19 @@ import java.util.UUID
*/
public interface VirtDriver {
/**
+ * The events emitted by the driver.
+ */
+ public val events: Flow<HypervisorEvent>
+
+ /**
* Spawn the given [Image] on the compute resource of this driver.
*
+ * @param name The name of the server to spawn.
* @param image The image to deploy.
- * @param monitor The monitor to use for the deployment of this particular image.
* @param flavor The flavor of the server which this driver is controlling.
* @return The virtual server spawned by this method.
*/
- public suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server
-
- /**
- * Adds the given [VirtDriverMonitor] to the list of monitors to keep informed on the state of this driver.
- *
- * @param monitor The monitor to keep informed.
- */
- public suspend fun addMonitor(monitor: VirtDriverMonitor)
-
- /**
- * Removes the given [VirtDriverMonitor] from the list of monitors.
- *
- * @param monitor The monitor to unsubscribe
- */
- public suspend fun removeMonitor(monitor: VirtDriverMonitor)
+ public suspend fun spawn(name: String, image: Image, flavor: Flavor): Server
companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver")
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt
deleted file mode 100644
index cf2f4619..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt
+++ /dev/null
@@ -1,14 +0,0 @@
-package com.atlarge.opendc.compute.virt.driver
-
-/**
- * Monitor for entities interested in the state of a [VirtDriver].
- */
-interface VirtDriverMonitor {
- /**
- * Called when the number of active servers on the server managed by this driver is updated.
- *
- * @param numberOfActiveServers The number of active servers.
- * @param availableMemory The available memory, in MB.
- */
- public suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long)
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt
deleted file mode 100644
index 1e3981f6..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.atlarge.opendc.compute.virt.monitor
-
-import com.atlarge.opendc.compute.core.Server
-
-/**
- * Monitoring interface for hypervisor-specific events.
- */
-interface HypervisorMonitor {
- /**
- * Invoked after a scheduling slice has finished processed.
- *
- * @param time The current time (in ms).
- * @param requestedBurst The total requested CPU time (can be above capacity).
- * @param grantedBurst The actual total granted capacity.
- * @param numberOfDeployedImages The number of images deployed on this hypervisor.
- * @param hostServer The server hosting this hypervisor.
- */
- suspend fun onSliceFinish(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- numberOfDeployedImages: Int,
- hostServer: Server
- )
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt
new file mode 100644
index 00000000..97842f18
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt
@@ -0,0 +1,12 @@
+package com.atlarge.opendc.compute.virt.service
+
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
+
+class HypervisorView(
+ var server: Server,
+ var numberOfActiveServers: Int,
+ var availableMemory: Long
+) {
+ lateinit var driver: VirtDriver
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt
deleted file mode 100644
index 41e67624..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.atlarge.opendc.compute.virt.service
-
-import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
-
-class NodeView(
- val node: Node,
- val hypervisor: HypervisorImage,
- var numberOfActiveServers: Int,
- var availableMemory: Long
-)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 17960186..156521db 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -3,123 +3,175 @@ package com.atlarge.opendc.compute.virt.service
import com.atlarge.odcsim.SimulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
-import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
-import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import com.atlarge.opendc.compute.virt.HypervisorImage
+import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
+import com.atlarge.opendc.core.services.ServiceKey
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlinx.coroutines.withContext
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+@OptIn(ExperimentalCoroutinesApi::class)
class SimpleVirtProvisioningService(
public override val allocationPolicy: AllocationPolicy,
private val ctx: SimulationContext,
- private val provisioningService: ProvisioningService,
- private val hypervisorMonitor: HypervisorMonitor
-) : VirtProvisioningService, ServerMonitor {
+ private val provisioningService: ProvisioningService
+) : VirtProvisioningService, CoroutineScope by ctx.domain {
/**
- * The nodes that are controlled by the service.
+ * The hypervisors that have been launched by the service.
*/
- internal lateinit var nodes: List<Node>
+ private val hypervisors: MutableMap<Server, HypervisorView> = mutableMapOf()
/**
- * The available nodes.
+ * The available hypervisors.
*/
- internal val availableNodes: MutableSet<NodeView> = mutableSetOf()
+ private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf()
/**
* The incoming images to be processed by the provisioner.
*/
- internal val incomingImages: MutableSet<ImageView> = mutableSetOf()
+ private val incomingImages: MutableSet<ImageView> = mutableSetOf()
/**
* The active images in the system.
*/
- internal val activeImages: MutableSet<ImageView> = mutableSetOf()
+ private val activeImages: MutableSet<ImageView> = mutableSetOf()
init {
- ctx.domain.launch {
- val provisionedNodes = provisioningService.nodes().toList()
- val deployedNodes = provisionedNodes.map { node ->
- val hypervisorImage = HypervisorImage(hypervisorMonitor)
- val deployedNode = provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService)
- val nodeView = NodeView(
- deployedNode,
- hypervisorImage,
- 0,
- deployedNode.server!!.flavor.memorySize
- )
- yield()
- deployedNode.server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor {
- override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) {
- nodeView.numberOfActiveServers = numberOfActiveServers
- nodeView.availableMemory = availableMemory
+ launch {
+ val provisionedNodes = provisioningService.nodes()
+ provisionedNodes.forEach { node ->
+ val hypervisorImage = HypervisorImage
+ val node = provisioningService.deploy(node, hypervisorImage)
+ node.server!!.events.onEach { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> stateChanged(event.server)
+ is ServerEvent.ServicePublished -> servicePublished(event.server, event.key)
}
- })
- nodeView
+ }.launchIn(this)
}
- nodes = deployedNodes.map { it.node }
- availableNodes.addAll(deployedNodes)
}
}
- override suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor) {
- val vmInstance = ImageView(image, monitor, flavor)
- incomingImages += vmInstance
- requestCycle()
+ override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) {
+ availableHypervisors.map { it.driver }.toSet()
+ }
+
+ override suspend fun deploy(
+ name: String,
+ image: Image,
+ flavor: Flavor
+ ): Server = withContext(coroutineContext) {
+ suspendCancellableCoroutine<Server> { cont ->
+ val vmInstance = ImageView(name, image, flavor, cont)
+ incomingImages += vmInstance
+ requestCycle()
+ }
}
+ private var call: Job? = null
+
private fun requestCycle() {
- ctx.domain.launch {
+ if (call != null) {
+ return
+ }
+
+ val call = launch {
+ delay(1)
+ this@SimpleVirtProvisioningService.call = null
schedule()
}
+ this.call = call
}
private suspend fun schedule() {
val imagesToBeScheduled = incomingImages.toSet()
for (imageInstance in imagesToBeScheduled) {
- println("Spawning $imageInstance")
+ val selectedHv = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break
+ try {
+ println("Spawning ${imageInstance.image}")
+ incomingImages -= imageInstance
- val selectedNode = availableNodes.minWith(allocationPolicy().thenBy { it.node.uid })
+ // Speculatively update the hypervisor view information to prevent other images in the queue from
+ // deciding on stale values.
+ selectedHv.numberOfActiveServers++
+ selectedHv.availableMemory -= (imageInstance.image as VmImage).requiredMemory // XXX Temporary hack
- try {
- imageInstance.server = selectedNode?.node!!.server!!.serviceRegistry[VirtDriver.Key].spawn(
+ val server = selectedHv.driver.spawn(
+ imageInstance.name,
imageInstance.image,
- imageInstance.monitor,
imageInstance.flavor
)
+ imageInstance.server = server
+ imageInstance.continuation.resume(server)
activeImages += imageInstance
} catch (e: InsufficientMemoryOnServerException) {
println("Unable to deploy image due to insufficient memory")
- }
- incomingImages -= imageInstance
+ selectedHv.numberOfActiveServers--
+ selectedHv.availableMemory += (imageInstance.image as VmImage).requiredMemory
+ }
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
+ private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
- // TODO handle hypervisor server becoming active
+ val hvView = HypervisorView(
+ server,
+ 0,
+ server.flavor.memorySize
+ )
+ hypervisors[server] = hvView
}
ServerState.SHUTOFF, ServerState.ERROR -> {
- // TODO handle hypervisor server shutting down or failing
+ val hv = hypervisors[server] ?: return
+ availableHypervisors -= hv
+ requestCycle()
}
else -> throw IllegalStateException()
}
}
- class ImageView(
+ private fun servicePublished(server: Server, key: ServiceKey<*>) {
+ if (key == VirtDriver.Key) {
+ val hv = hypervisors[server] ?: return
+ hv.driver = server.services[VirtDriver]
+ availableHypervisors += hv
+
+ hv.driver.events
+ .onEach { event ->
+ if (event is HypervisorEvent.VmsUpdated) {
+ hv.numberOfActiveServers = event.numberOfActiveServers
+ hv.availableMemory = event.availableMemory
+ }
+ }.launchIn(this)
+
+ requestCycle()
+ }
+ }
+
+ data class ImageView(
+ val name: String,
val image: Image,
- val monitor: ServerMonitor,
val flavor: Flavor,
+ val continuation: Continuation<Server>,
var server: Server? = null
)
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 7770ec50..a3ade2fb 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -1,8 +1,9 @@
package com.atlarge.opendc.compute.virt.service
import com.atlarge.opendc.compute.core.Flavor
+import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
/**
@@ -12,11 +13,16 @@ interface VirtProvisioningService {
val allocationPolicy: AllocationPolicy
/**
+ * Obtain the active hypervisors for this provisioner.
+ */
+ public suspend fun drivers(): Set<VirtDriver>
+
+ /**
* Submit the specified [Image] to the provisioning service.
*
+ * @param name The name of the server to deploy.
* @param image The image to be deployed.
- * @param monitor The monitor to inform on events.
* @param flavor The flavor of the machine instance to run this [image] on.
*/
- public suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor)
+ public suspend fun deploy(name: String, image: Image, flavor: Flavor): Server
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt
index a1c0ab9a..e2871cca 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt
@@ -1,7 +1,7 @@
package com.atlarge.opendc.compute.virt.service.allocation
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.virt.service.NodeView
+import com.atlarge.opendc.compute.virt.service.HypervisorView
/**
* A policy for selecting the [Node] an image should be deployed to,
@@ -10,5 +10,5 @@ interface AllocationPolicy {
/**
* Builds the logic of the policy.
*/
- operator fun invoke(): Comparator<NodeView>
+ operator fun invoke(): Comparator<HypervisorView>
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt
index b3e9d77e..f095849b 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt
@@ -1,12 +1,12 @@
package com.atlarge.opendc.compute.virt.service.allocation
-import com.atlarge.opendc.compute.virt.service.NodeView
+import com.atlarge.opendc.compute.virt.service.HypervisorView
/**
* Allocation policy that selects the node with the most available memory.
*/
class AvailableMemoryAllocationPolicy : AllocationPolicy {
- override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 ->
+ override fun invoke(): Comparator<HypervisorView> = Comparator { o1, o2 ->
compareValuesBy(o1, o2) { -it.availableMemory }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt
index 9d6582dd..59e48465 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt
@@ -1,13 +1,13 @@
package com.atlarge.opendc.compute.virt.service.allocation
-import com.atlarge.opendc.compute.virt.service.NodeView
+import com.atlarge.opendc.compute.virt.service.HypervisorView
import kotlinx.coroutines.runBlocking
/**
* Allocation policy that selects the node with the least amount of active servers.
*/
class NumberOfActiveServersAllocationPolicy : AllocationPolicy {
- override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 ->
+ override fun invoke(): Comparator<HypervisorView> = Comparator { o1, o2 ->
runBlocking {
compareValuesBy(o1, o2) { it.numberOfActiveServers }
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index b8882eda..0fc64373 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -25,14 +25,12 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.metal.PowerState
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
@@ -55,21 +53,19 @@ internal class SimpleBareMetalDriverTest {
val dom = root.newDomain(name = "driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList())
-
- val monitor = object : ServerMonitor {
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println("[${simulationContext.clock.millis()}] $server")
- finalState = server.state
- }
- }
+ val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2)
// Batch driver commands
withContext(dom.coroutineContext) {
- driver.init(monitor)
+ driver.init()
driver.setImage(image)
- driver.setPower(PowerState.POWER_ON)
+ val server = driver.start().server!!
+ server.events.collect { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> { println(event); finalState = event.server.state }
+ }
+ }
}
}
@@ -78,6 +74,6 @@ internal class SimpleBareMetalDriverTest {
system.terminate()
}
- assertEquals(finalState, ServerState.SHUTOFF)
+ assertEquals(ServerState.SHUTOFF, finalState)
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index a837130d..f8bd786e 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -27,12 +27,10 @@ package com.atlarge.opendc.compute.metal.service
import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
@@ -53,23 +51,18 @@ internal class SimpleProvisioningServiceTest {
val root = system.newDomain(name = "root")
root.launch {
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
- val monitor = object : ServerMonitor {
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println(server)
- }
- }
-
val dom = root.newDomain("provisioner")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList())
+ val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val provisioner = SimpleProvisioningService(dom)
provisioner.create(driver)
delay(5)
val nodes = provisioner.nodes()
- provisioner.deploy(nodes.first(), image, monitor)
+ val node = provisioner.deploy(nodes.first(), image)
+ node.server!!.events.collect { println(it) }
}
runBlocking {
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index 254ad5fe..58d784b0 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -22,22 +22,19 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.compute.virt.driver.hypervisor
+package com.atlarge.opendc.compute.virt
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
-import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.metal.PowerState
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
@@ -51,6 +48,7 @@ internal class HypervisorTest {
/**
* A smoke test for the bare-metal driver.
*/
+ @OptIn(ExperimentalCoroutinesApi::class)
@Test
fun smoke() {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
@@ -58,41 +56,29 @@ internal class HypervisorTest {
val root = system.newDomain("root")
root.launch {
- val vmm = HypervisorImage(object : HypervisorMonitor {
- override suspend fun onSliceFinish(
- time: Long,
- requestedBurst: Long,
- grantedBurst: Long,
- numberOfDeployedImages: Int,
- hostServer: Server
- ) {
- println("Hello World!")
- }
- })
+ val vmm = HypervisorImage
val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1)
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1)
- val monitor = object : ServerMonitor {
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println("[${simulationContext.clock.millis()}]: $server")
- }
- }
val driverDom = root.newDomain("driver")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
val cpus = List(2) { ProcessingUnit(cpuNode, it, 2000.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
- metalDriver.init(monitor)
+ metalDriver.init()
metalDriver.setImage(vmm)
- metalDriver.setPower(PowerState.POWER_ON)
+ val node = metalDriver.start()
+ node.server?.events?.onEach { println(it) }?.launchIn(this)
delay(5)
val flavor = Flavor(1, 0)
- val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver]
- vmDriver.spawn(workloadA, monitor, flavor)
- vmDriver.spawn(workloadB, monitor, flavor)
+ val vmDriver = metalDriver.refresh().server!!.services[VirtDriver]
+ val vmA = vmDriver.spawn("a", workloadA, flavor)
+ vmA.events.onEach { println(it) }.launchIn(this)
+ val vmB = vmDriver.spawn("b", workloadB, flavor)
+ vmB.events.onEach { println(it) }.launchIn(this)
}
runBlocking {
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
new file mode 100644
index 00000000..da4dee12
--- /dev/null
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
@@ -0,0 +1,110 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.core.failure
+
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.ensureActive
+import kotlinx.coroutines.launch
+import kotlin.math.exp
+import kotlin.random.Random
+import kotlin.random.asJavaRandom
+
+/**
+ * A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in
+ * isolation, but will trigger other faults.
+ */
+public class CorrelatedFaultInjector(
+ private val domain: Domain,
+ private val iatScale: Double,
+ private val iatShape: Double,
+ private val sizeScale: Double,
+ private val sizeShape: Double,
+ random: Random = Random
+) : FaultInjector {
+ /**
+ * The active failure domains that have been registered.
+ */
+ private val active = mutableSetOf<FailureDomain>()
+
+ /**
+ * The [Job] that awaits the nearest fault in the system.
+ */
+ private var job: Job? = null
+
+ /**
+ * The [Random] instance to use.
+ */
+ private val random: java.util.Random = random.asJavaRandom()
+
+ /**
+ * Enqueue the specified [FailureDomain] to fail some time in the future.
+ */
+ override fun enqueue(domain: FailureDomain) {
+ active += domain
+
+ // Clean up the domain if it finishes
+ domain.scope.coroutineContext[Job]!!.invokeOnCompletion {
+ this@CorrelatedFaultInjector.domain.launch {
+ active -= domain
+
+ if (active.isEmpty()) {
+ job?.cancel()
+ job = null
+ }
+ }
+ }
+
+ if (job != null) {
+ return
+ }
+
+ job = this.domain.launch {
+ while (true) {
+ ensureActive()
+
+ // Make sure to convert delay from hours to milliseconds
+ val d = lognvariate(iatScale, iatShape) * 3600 * 1e6
+
+ // Handle long overflow
+ if (simulationContext.clock.millis() + d <= 0) {
+ return@launch
+ }
+
+ delay(d.toLong())
+
+ val n = lognvariate(sizeScale, sizeShape).toInt()
+ for (failureDomain in active.shuffled(random).take(n)) {
+ failureDomain.fail()
+ }
+ }
+ }
+ }
+
+ // XXX We should extract this in some common package later on.
+ private fun lognvariate(scale: Double, shape: Double) = exp(scale + shape * random.nextGaussian())
+}
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt
new file mode 100644
index 00000000..91ca9b83
--- /dev/null
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt
@@ -0,0 +1,42 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.core.failure
+
+import kotlinx.coroutines.CoroutineScope
+
+/**
+ * A logical or physical component in a computing environment which may fail.
+ */
+public interface FailureDomain {
+ /**
+ * The lifecycle of the failure domain to which a [FaultInjector] will attach.
+ */
+ public val scope: CoroutineScope
+
+ /**
+ * Fail the domain externally.
+ */
+ public suspend fun fail()
+}
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FaultInjector.kt
new file mode 100644
index 00000000..ac7a08de
--- /dev/null
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FaultInjector.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.core.failure
+
+/**
+ * An interface for stochastically injecting faults into a running system.
+ */
+public interface FaultInjector {
+ /**
+ * Enqueue the specified [FailureDomain] into the queue as candidate for failure injection in the future.
+ */
+ public fun enqueue(domain: FailureDomain)
+}
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
new file mode 100644
index 00000000..3883eb11
--- /dev/null
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
@@ -0,0 +1,58 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.core.failure
+
+import com.atlarge.odcsim.simulationContext
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlin.math.ln1p
+import kotlin.math.pow
+import kotlin.random.Random
+
+/**
+ * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are
+ * independent.
+ */
+public class UncorrelatedFaultInjector(private val alpha: Double, private val beta: Double, private val random: Random = Random) : FaultInjector {
+ /**
+ * Enqueue the specified [FailureDomain] to fail some time in the future.
+ */
+ override fun enqueue(domain: FailureDomain) {
+ domain.scope.launch {
+ val d = random.weibull(alpha, beta) * 1e3 // Make sure to convert delay to milliseconds
+
+ // Handle long overflow
+ if (simulationContext.clock.millis() + d <= 0) {
+ return@launch
+ }
+
+ delay(d.toLong())
+ domain.fail()
+ }
+ }
+
+ // XXX We should extract this in some common package later on.
+ private fun Random.weibull(alpha: Double, beta: Double) = (beta * (-ln1p(-nextDouble())).pow(1.0 / alpha))
+}
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt
index d9a85231..75aa778f 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt
@@ -25,10 +25,15 @@
package com.atlarge.opendc.core.services
/**
- * A service registry for a datacenter zone.
+ * An immutable service registry interface.
*/
public interface ServiceRegistry {
/**
+ * The keys in this registry.
+ */
+ public val keys: Collection<ServiceKey<*>>
+
+ /**
* Determine if this map contains the service with the specified [ServiceKey].
*
* @param key The key of the service to check for.
@@ -41,12 +46,18 @@ public interface ServiceRegistry {
*
* @param key The key of the service to obtain.
* @return The references to the service.
- * @throws IllegalArgumentException if the key does not exists in the map.
+ * @throws IllegalArgumentException if the key does not exist in the map.
*/
public operator fun <T : Any> get(key: ServiceKey<T>): T
/**
- * Register the specified [ServiceKey] in this registry.
+ * Return the result of associating the specified [service] with the given [key] in this registry.
*/
- public operator fun <T : Any> set(key: ServiceKey<T>, service: T)
+ public fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry
}
+
+/**
+ * Construct an empty [ServiceRegistry].
+ */
+@Suppress("FunctionName")
+public fun ServiceRegistry(): ServiceRegistry = ServiceRegistryImpl(emptyMap())
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt
index 91147839..0686ebaf 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt
@@ -27,20 +27,18 @@ package com.atlarge.opendc.core.services
/**
* Default implementation of the [ServiceRegistry] interface.
*/
-public class ServiceRegistryImpl : ServiceRegistry {
- /**
- * The map containing the registered services.
- */
- private val services: MutableMap<ServiceKey<*>, Any> = mutableMapOf()
+internal class ServiceRegistryImpl(private val map: Map<ServiceKey<*>, Any>) : ServiceRegistry {
+ override val keys: Collection<ServiceKey<*>>
+ get() = map.keys
- override fun <T : Any> set(key: ServiceKey<T>, service: T) {
- services[key] = service
- }
-
- override fun contains(key: ServiceKey<*>): Boolean = key in services
+ override fun contains(key: ServiceKey<*>): Boolean = key in map
override fun <T : Any> get(key: ServiceKey<T>): T {
@Suppress("UNCHECKED_CAST")
- return services[key] as T
+ return map[key] as T
}
+
+ override fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry = ServiceRegistryImpl(map.plus(key to service))
+
+ override fun toString(): String = map.toString()
}
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index d5e1404a..b0182ab3 100644
--- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -29,8 +29,8 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.WorkflowEvent
import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode
import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
@@ -38,12 +38,12 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
@@ -62,28 +62,6 @@ fun main(args: Array<String>) {
var finished = 0
val token = Channel<Boolean>()
-
- val monitor = object : WorkflowMonitor {
- override suspend fun onJobStart(job: Job, time: Long) {
- println("Job ${job.uid} started")
- }
-
- override suspend fun onJobFinish(job: Job, time: Long) {
- finished += 1
- println("Jobs $finished/$total finished (${job.tasks.size} tasks)")
-
- if (finished == total) {
- token.send(true)
- }
- }
-
- override suspend fun onTaskStart(job: Job, task: Task, time: Long) {
- }
-
- override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) {
- }
- }
-
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider(name = "sim")
@@ -106,6 +84,27 @@ fun main(args: Array<String>) {
}
val broker = system.newDomain(name = "broker")
+
+ broker.launch {
+ val scheduler = schedulerAsync.await()
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is WorkflowEvent.JobStarted -> {
+ println("Job ${event.job.uid} started")
+ }
+ is WorkflowEvent.JobFinished -> {
+ finished += 1
+ println("Jobs $finished/$total finished (${event.job.tasks.size} tasks)")
+
+ if (finished == total) {
+ token.send(true)
+ }
+ }
+ }
+ }
+ .collect()
+ }
broker.launch {
val ctx = simulationContext
val reader = GwfTraceReader(File(args[0]))
@@ -115,7 +114,7 @@ fun main(args: Array<String>) {
val (time, job) = reader.next()
total += 1
delay(max(0, time * 1000 - ctx.clock.millis()))
- scheduler.submit(job, monitor)
+ scheduler.submit(job)
}
}
diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts
index d3b37336..28b8ae12 100644
--- a/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -41,7 +41,9 @@ dependencies {
implementation("com.xenomachina:kotlin-argparser:2.0.7")
api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")
+ runtimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
runtimeOnly(project(":odcsim:odcsim-engine-omega"))
+
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 9e8f0fa8..36da7703 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -1,23 +1,32 @@
package com.atlarge.opendc.experiments.sc20
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import kotlinx.coroutines.flow.first
import java.io.BufferedWriter
import java.io.Closeable
import java.io.FileWriter
-class Sc20HypervisorMonitor(
+class Sc20Monitor(
destination: String
-) : HypervisorMonitor, Closeable {
+) : Closeable {
private val outputFile = BufferedWriter(FileWriter(destination))
+ private var failed: Int = 0
init {
- outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw\n")
+ outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
}
- override suspend fun onSliceFinish(
+ suspend fun stateChanged(server: Server) {
+ println("[${simulationContext.clock.millis()}] ${server.uid} ${server.state}")
+ if (server.state == ServerState.ERROR) {
+ failed++
+ }
+ }
+
+ suspend fun onSliceFinish(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
@@ -25,11 +34,11 @@ class Sc20HypervisorMonitor(
hostServer: Server
) {
// Assume for now that the host is not virtualized and measure the current power draw
- val driver = hostServer.serviceRegistry[BareMetalDriver.Key]
+ val driver = hostServer.services[BareMetalDriver.Key]
val usage = driver.usage.first()
val powerDraw = driver.powerDraw.first()
- outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw")
+ outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed")
outputFile.newLine()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index f0d3fc8d..639c3aef 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -24,15 +24,19 @@
package com.atlarge.opendc.experiments.sc20
+import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
+import com.atlarge.opendc.compute.core.ServerEvent
+import com.atlarge.opendc.compute.metal.NODE_CLUSTER
import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
+import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
+import com.atlarge.opendc.core.failure.FailureDomain
+import com.atlarge.opendc.core.failure.FaultInjector
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader
@@ -40,11 +44,15 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
-import java.io.FileInputStream
import java.io.FileReader
import java.util.ServiceLoader
import kotlin.math.max
@@ -81,27 +89,35 @@ class ExperimentParameters(parser: ArgParser) {
}
/**
+ * Obtain the [FaultInjector] to use for the experiments.
+ */
+fun createFaultInjector(domain: Domain): FaultInjector {
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ return CorrelatedFaultInjector(domain,
+ iatScale = -1.39, iatShape = 1.03,
+ sizeScale = 1.88, sizeShape = 1.25
+ )
+}
+
+/**
* Main entry point of the experiment.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
fun main(args: Array<String>) {
ArgParser(args).parseInto(::ExperimentParameters).run {
- val hypervisorMonitor = Sc20HypervisorMonitor(outputFile)
- val monitor = object : ServerMonitor {
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println(server)
- }
- }
+ val monitor = Sc20Monitor(outputFile)
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider("test")
val root = system.newDomain("root")
+ val chan = Channel<Unit>(Channel.CONFLATED)
root.launch {
val environment = Sc20ClusterEnvironmentReader(File(environmentFile))
.use { it.construct(root) }
val performanceInterferenceStream = if (performanceInterferenceFile != null) {
- FileInputStream(File(performanceInterferenceFile!!))
+ File(performanceInterferenceFile!!).inputStream().buffered()
} else {
object {}.javaClass.getResourceAsStream("/env/performance-interference.json")
}
@@ -111,18 +127,56 @@ fun main(args: Array<String>) {
println(simulationContext.clock.instant())
+ val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
val scheduler = SimpleVirtProvisioningService(
AvailableMemoryAllocationPolicy(),
simulationContext,
- environment.platforms[0].zones[0].services[ProvisioningService.Key],
- hypervisorMonitor
+ bareMetalProvisioner
)
+ // Wait for the hypervisors to be spawned
+ delay(10)
+
+ // Monitor hypervisor events
+ for (hypervisor in scheduler.drivers()) {
+ hypervisor.events
+ .onEach { event ->
+ when (event) {
+ is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, event.numberOfDeployedImages, event.hostServer)
+ else -> println(event)
+ }
+ }
+ .launchIn(this)
+ }
+
+ root.newDomain(name = "failures").launch {
+ chan.receive()
+ val injectors = mutableMapOf<String, FaultInjector>()
+
+ for (node in bareMetalProvisioner.nodes()) {
+ val cluster = node.metadata[NODE_CLUSTER] as String
+ val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain) }
+ injector.enqueue(node.metadata["driver"] as FailureDomain)
+ }
+ }
+
val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList())
while (reader.hasNext()) {
val (time, workload) = reader.next()
delay(max(0, time - simulationContext.clock.millis()))
- scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory))
+ launch {
+ chan.send(Unit)
+ val server = scheduler.deploy(
+ workload.image.name, workload.image,
+ Flavor(workload.image.cores, workload.image.requiredMemory)
+ )
+ // Monitor server events
+ server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect()
+ }
}
println(simulationContext.clock.instant())
@@ -134,6 +188,6 @@ fun main(args: Array<String>) {
}
// Explicitly close the monitor to flush its buffer
- hypervisorMonitor.close()
+ monitor.close()
}
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 0d4bd125..6f6aa616 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -34,7 +34,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
import com.atlarge.opendc.core.Environment
import com.atlarge.opendc.core.Platform
import com.atlarge.opendc.core.Zone
-import com.atlarge.opendc.core.services.ServiceRegistryImpl
+import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
@@ -77,7 +77,14 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
- SimpleBareMetalDriver(dom.newDomain("node-$counter"), UUID.randomUUID(), "node-${counter++}", cores, listOf(MemoryUnit("", "", 2300.0, 16000)))
+ SimpleBareMetalDriver(
+ dom.newDomain("node-$counter"),
+ UUID.randomUUID(),
+ "node-${counter++}",
+ emptyMap(),
+ cores,
+ listOf(MemoryUnit("", "", 2300.0, 16000))
+ )
}
}
}
@@ -89,9 +96,7 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
provisioningService.create(node)
}
- val serviceRegistry = ServiceRegistryImpl()
- serviceRegistry[ProvisioningService.Key] = provisioningService
-
+ val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
UUID.randomUUID(), "sc18-platform", listOf(
Zone(UUID.randomUUID(), "zone", serviceRegistry)
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index ae0ba550..708e27bf 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -28,6 +28,7 @@ import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
+import com.atlarge.opendc.compute.metal.NODE_CLUSTER
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel
import com.atlarge.opendc.compute.metal.service.ProvisioningService
@@ -35,7 +36,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
import com.atlarge.opendc.core.Environment
import com.atlarge.opendc.core.Platform
import com.atlarge.opendc.core.Zone
-import com.atlarge.opendc.core.services.ServiceRegistryImpl
+import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.format.environment.EnvironmentReader
import java.io.BufferedReader
import java.io.File
@@ -100,13 +101,14 @@ class Sc20ClusterEnvironmentReader(
dom.newDomain("node-$clusterId-$it"),
UUID.randomUUID(),
"node-$clusterId-$it",
+ mapOf(NODE_CLUSTER to clusterId),
List(coresPerHost) { coreId ->
ProcessingUnit(unknownProcessingNode, coreId, speed)
},
- listOf(unknownMemoryUnit),
// For now we assume a simple linear load model with an idle draw of ~200W and a maximum
// power draw of 350W.
// Source: https://stackoverflow.com/questions/6128960
+ listOf(unknownMemoryUnit),
LinearLoadPowerModel(200.0, 350.0)
)
)
@@ -119,8 +121,7 @@ class Sc20ClusterEnvironmentReader(
provisioningService.create(node)
}
- val serviceRegistry = ServiceRegistryImpl()
- serviceRegistry[ProvisioningService.Key] = provisioningService
+ val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
UUID.randomUUID(), "sc20-platform", listOf(
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index a954a308..4b5d6fb7 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -35,7 +35,7 @@ import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
import com.atlarge.opendc.core.Environment
import com.atlarge.opendc.core.Platform
import com.atlarge.opendc.core.Zone
-import com.atlarge.opendc.core.services.ServiceRegistryImpl
+import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
@@ -85,11 +85,12 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
dom.newDomain("node-$counter"),
UUID.randomUUID(),
"node-${counter++}",
+ emptyMap(),
cores,
- memories,
// For now we assume a simple linear load model with an idle draw of ~200W and a maximum
// power draw of 350W.
// Source: https://stackoverflow.com/questions/6128960
+ memories,
LinearLoadPowerModel(200.0, 350.0)
)
}
@@ -103,8 +104,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
provisioningService.create(node)
}
- val serviceRegistry = ServiceRegistryImpl()
- serviceRegistry[ProvisioningService.Key] = provisioningService
+ val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
val platform = Platform(
UUID.randomUUID(), "sc20-platform", listOf(
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
index b444f91c..1cb2de97 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
@@ -24,10 +24,9 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.workload.Job
-class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) {
+class JobState(val job: Job, val submittedAt: Long) {
/**
* A flag to indicate whether this job is finished.
*/
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 008cd1ee..7a20363c 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -25,13 +25,13 @@
package com.atlarge.opendc.workflows.service
import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.service.ProvisioningService
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy
import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy
@@ -39,6 +39,11 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import java.util.PriorityQueue
import java.util.Queue
import kotlinx.coroutines.launch
@@ -58,7 +63,7 @@ class StageWorkflowService(
taskOrderPolicy: TaskOrderPolicy,
resourceFilterPolicy: ResourceFilterPolicy,
resourceSelectionPolicy: ResourceSelectionPolicy
-) : WorkflowService, ServerMonitor {
+) : WorkflowService, CoroutineScope by domain {
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -167,6 +172,7 @@ class StageWorkflowService(
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
private val resourceFilterPolicy: ResourceFilterPolicy.Logic
private val resourceSelectionPolicy: Comparator<Node>
+ private val eventFlow = EventFlow<WorkflowEvent>()
init {
domain.launch {
@@ -183,9 +189,11 @@ class StageWorkflowService(
this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
- override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) {
+ override val events: Flow<WorkflowEvent> = eventFlow
+
+ override suspend fun submit(job: Job) = withContext(domain.coroutineContext) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, monitor, simulationContext.clock.millis())
+ val jobInstance = JobState(job, simulationContext.clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -217,6 +225,7 @@ class StageWorkflowService(
/**
* Perform a scheduling cycle immediately.
*/
+ @OptIn(ExperimentalCoroutinesApi::class)
internal suspend fun schedule() {
// J2 Create list of eligible jobs
val iterator = incomingJobs.iterator()
@@ -232,7 +241,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis())
+ eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis()))
rootListener.jobStarted(jobInstance)
}
@@ -280,10 +289,13 @@ class StageWorkflowService(
// T4 Submit task to machine
available -= host
instance.state = TaskStatus.ACTIVE
-
- val newHost = provisioningService.deploy(host, instance.task.image, this)
+ val newHost = provisioningService.deploy(host, instance.task.image)
+ val server = newHost.server!!
instance.host = newHost
- taskByServer[newHost.server!!] = instance
+ taskByServer[server] = instance
+ server.events
+ .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
+ .launchIn(this)
activeTasks += instance
taskQueue.poll()
@@ -294,12 +306,12 @@ class StageWorkflowService(
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) {
+ private suspend fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
task.startedAt = simulationContext.clock.millis()
- task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
+ eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
rootListener.taskStarted(task)
}
ServerState.SHUTOFF, ServerState.ERROR -> {
@@ -310,7 +322,7 @@ class StageWorkflowService(
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
+ eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -335,7 +347,7 @@ class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- job.monitor.onJobFinish(job.job, simulationContext.clock.millis())
+ eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis()))
rootListener.jobFinished(job)
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt
new file mode 100644
index 00000000..2ca5a19d
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt
@@ -0,0 +1,76 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+import com.atlarge.opendc.workflows.workload.Job
+import com.atlarge.opendc.workflows.workload.Task
+
+/**
+ * An event emitted by the [WorkflowService].
+ */
+public sealed class WorkflowEvent {
+ /**
+ * The [WorkflowService] that emitted the event.
+ */
+ public abstract val service: WorkflowService
+
+ /**
+ * This event is emitted when a job has become active.
+ */
+ public data class JobStarted(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val time: Long
+ ) : WorkflowEvent()
+
+ /**
+ * This event is emitted when a job has finished processing.
+ */
+ public data class JobFinished(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val time: Long
+ ) : WorkflowEvent()
+
+ /**
+ * This event is emitted when a task of a job has started processing.
+ */
+ public data class TaskStarted(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val task: Task,
+ public val time: Long
+ ) : WorkflowEvent()
+
+ /**
+ * This event is emitted when a task of a job has started processing.
+ */
+ public data class TaskFinished(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val task: Task,
+ public val time: Long
+ ) : WorkflowEvent()
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
index 524f4f9e..38ea49c4 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -25,8 +25,8 @@
package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.core.services.AbstractServiceKey
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.workload.Job
+import kotlinx.coroutines.flow.Flow
import java.util.UUID
/**
@@ -36,9 +36,14 @@ import java.util.UUID
*/
public interface WorkflowService {
/**
+ * Thie events emitted by the workflow scheduler.
+ */
+ public val events: Flow<WorkflowEvent>
+
+ /**
* Submit the specified [Job] to the workflow service for scheduling.
*/
- public suspend fun submit(job: Job, monitor: WorkflowMonitor)
+ public suspend fun submit(job: Job)
/**
* The service key for the workflow scheduler.
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
index 40389ce2..02969d8a 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
@@ -47,4 +47,6 @@ data class Job(
override fun equals(other: Any?): Boolean = other is Job && uid == other.uid
override fun hashCode(): Int = uid.hashCode()
+
+ override fun toString(): String = "Job(uid=$uid, name=$name, tasks=${tasks.size}, metadata=$metadata)"
}
diff --git a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 19e56482..5ee6d5e6 100644
--- a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -29,17 +29,16 @@ import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
-import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
@@ -64,24 +63,6 @@ internal class StageWorkflowSchedulerIntegrationTest {
var tasksStarted = 0L
var tasksFinished = 0L
- val monitor = object : WorkflowMonitor {
- override suspend fun onJobStart(job: Job, time: Long) {
- jobsStarted++
- }
-
- override suspend fun onJobFinish(job: Job, time: Long) {
- jobsFinished++
- }
-
- override suspend fun onTaskStart(job: Job, task: Task, time: Long) {
- tasksStarted++
- }
-
- override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) {
- tasksFinished++
- }
- }
-
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider(name = "sim")
@@ -104,6 +85,21 @@ internal class StageWorkflowSchedulerIntegrationTest {
}
val broker = system.newDomain(name = "broker")
+
+ broker.launch {
+ val scheduler = schedulerAsync.await()
+ scheduler.events
+ .onEach { event ->
+ when (event) {
+ is WorkflowEvent.JobStarted -> jobsStarted++
+ is WorkflowEvent.JobFinished -> jobsFinished++
+ is WorkflowEvent.TaskStarted -> tasksStarted++
+ is WorkflowEvent.TaskFinished -> tasksFinished++
+ }
+ }
+ .collect()
+ }
+
broker.launch {
val ctx = simulationContext
val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
@@ -113,7 +109,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
val (time, job) = reader.next()
jobsSubmitted++
delay(max(0, time * 1000 - ctx.clock.millis()))
- scheduler.submit(job, monitor)
+ scheduler.submit(job)
}
}