diff options
31 files changed, 285 insertions, 827 deletions
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt deleted file mode 100644 index 1116c447..00000000 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.odcsim - -/** - * The behavior of a logical process defines how the process operates within its environments, represented as coroutine. - */ -public typealias Behavior = suspend (ctx: ProcessContext) -> Unit diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt deleted file mode 100644 index 10b4908a..00000000 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt +++ /dev/null @@ -1,70 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.odcsim - -import java.io.Serializable - -/** - * A unidirectional communication medium using message passing. Processes may send messages over a channel, and - * another process is able to receive messages sent over a channel it has a reference to (in form of a [ReceivePort]). - * - * Channels are represented by their send and receive endpoints at which processes may respectively send and receive - * messages from this channel. - * - * Channels and their respective send and receive references may be shared freely between logical processes in the same - * simulation. - */ -public interface Channel<T : Any> : Serializable { - /** - * The endpoint of the channel processes may use to send messages to. - */ - public val send: SendRef<T> - - /** - * The endpoint of the channel processes may receive messages from. - */ - public val receive: ReceiveRef<T> - - /** - * Obtain the send endpoint of the channel when unpacking the channel. See [send]. - */ - public operator fun component1(): SendRef<T> = send - - /** - * Obtain the receive endpoint of the channel when unpacking the channel. See [receive]. - */ - public operator fun component2(): ReceiveRef<T> = receive -} - -/** - * An opaque object representing a [Channel] endpoint through which logical processes can send messages over the - * channel. - */ -public interface SendRef<in T : Any> : Serializable - -/** - * An opaque object representing the receive endpoint of a [Channel]. - */ -public interface ReceiveRef<out T : Any> : Serializable diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt deleted file mode 100644 index b15ce3ae..00000000 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.odcsim - -suspend fun <T : Any, U : Any> SendRef<T>.ask(block: (SendRef<U>) -> T): U { - val ctx = processContext - val outlet = ctx.connect(this) - val channel = ctx.open<U>() - try { - outlet.send(block(channel.send)) - } finally { - outlet.close() - } - - val inlet = ctx.listen(channel.receive) - try { - return inlet.receive() - } finally { - inlet.close() - } -} - -suspend fun <T : Any> SendRef<T>.sendOnce(msg: T) { - val outlet = processContext.connect(this) - try { - outlet.send(msg) - } finally { - outlet.close() - } -} diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessRef.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Domain.kt index 833458e4..c850952c 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessRef.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Domain.kt @@ -24,22 +24,31 @@ package com.atlarge.odcsim -import java.io.Serializable +import kotlinx.coroutines.CoroutineScope /** - * A reference to a logical process in simulation. + * An isolated execution unit that runs concurrently in simulation to the other simulation domains. A domain defines a + * logical boundary between processes in simulation. */ -public interface ProcessRef : Comparable<ProcessRef>, Serializable { +public interface Domain : CoroutineScope { /** - * The name of the process. + * The name of this domain. */ public val name: String /** - * Compare [other] process ref with this process reference for order. - * - * @return a negative integer, zero, or a positive integer as this object is less than, equal to, or greater - * than the specified path. + * The parent domain to which the lifecycle of this domain is bound. In case this is a root domain, this refers to + * itself. */ - override fun compareTo(other: ProcessRef): Int = name.compareTo(other.name) + public val parent: Domain + + /** + * Construct an anonymous simulation sub-domain that is bound to the lifecycle of this domain. + */ + public fun newDomain(): Domain + + /** + * Construct a new simulation sub-domain with the specified [name]. + */ + public fun newDomain(name: String): Domain } diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt deleted file mode 100644 index 7c730866..00000000 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.odcsim - -import kotlinx.coroutines.selects.SelectClause1 - -/** - * A communication endpoint of a specific logical process through which messages pass. - * - * Ports are tied to a specific logical process and may not be shared with other processes. Doing so results in - * undefined behavior and might cause violation of time-consistency between processes. - */ -public interface Port { - /** - * Close this communication endpoint. This informs the process(es) at the other end of the port that the caller - * will not send or receive messages via this port. - * - * This is an idempotent operation – subsequent invocations of this function have no effect and return `false`. - */ - fun close(): Boolean -} - -/** - * A [Port] through which a logical process may receive messages from other [SendPort]s. - */ -public interface ReceivePort<out T : Any> : Port { - /** - * Receive a message send to this port or suspend the caller while no messages have been received at this port yet. - */ - public suspend fun receive(): T - - /** - * Clause for select expression for receiving a message from the channel. - */ - val onReceive: SelectClause1<T> -} - -/** - * A [Port] through which logical processes may send messages to a [ReceivePort]. - */ -public interface SendPort<in T : Any> : Port { - /** - * Send a message via this port to the process(es) listening at the other end of the port. - * - * Messages are send asynchronously to the receivers and do not suspend the caller. This method guarantees - * exactly-once delivery while respecting time-consistency between owner of the send port and its receivers. - */ - public fun send(message: T) -} diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessContext.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt index 30ef4114..c51d1d8b 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessContext.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt @@ -27,22 +27,21 @@ package com.atlarge.odcsim import java.time.Clock import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -import kotlinx.coroutines.CoroutineScope import org.slf4j.Logger /** - * Represents the execution context of a logical process in simulation. + * Represents the execution context of a simulation domain. */ -public interface ProcessContext : CoroutineContext.Element, CoroutineScope { +public interface SimulationContext : CoroutineContext.Element { /** - * Key for [ProcessContext] instance in the coroutine context. + * Key for [SimulationContext] instance in the coroutine context. */ - companion object Key : CoroutineContext.Key<ProcessContext> + companion object Key : CoroutineContext.Key<SimulationContext> /** - * The reference to the logical process of this context. + * The reference to the current simulation domain. */ - public val self: ProcessRef + public val domain: Domain /** * The clock tracking the simulation time. @@ -53,37 +52,12 @@ public interface ProcessContext : CoroutineContext.Element, CoroutineScope { * A logger instance tied to the logical process. */ public val log: Logger - - /** - * Spawn an anonymous logical process in the simulation universe with the specified [behavior]. - */ - public fun spawn(behavior: Behavior): ProcessRef - - /** - * Spawn a logical process in the simulation universe with the specified [behavior] and [name]. - */ - public fun spawn(behavior: Behavior, name: String): ProcessRef - - /** - * Open a new communication [Channel] for messages of type [T]. - */ - public fun <T : Any> open(): Channel<T> - - /** - * Create a [SendPort] for sending messages to the specified [send]. - */ - public suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T> - - /** - * Create a [ReceivePort] for listening to the messages sent to the specified [receive] endpoint of a channel. - */ - public suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T> } /** - * The process context of the current coroutine. + * The simulation context of the current coroutine. */ @Suppress("WRONG_MODIFIER_TARGET") -public suspend inline val processContext: ProcessContext +public suspend inline val simulationContext: SimulationContext @Suppress("ILLEGAL_SUSPEND_PROPERTY_ACCESS") - get() = coroutineContext[ProcessContext.Key] ?: throw IllegalStateException("No process context active") + get() = coroutineContext[SimulationContext] ?: throw IllegalStateException("No simulation context available") diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt index e35acbf0..db05cb1d 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt @@ -37,6 +37,16 @@ public interface SimulationEngine { public val name: String /** + * Construct an anonymous root simulation domain. + */ + public fun newDomain(): Domain + + /** + * Construct a new root simulation domain with the specified [name]. + */ + public fun newDomain(name: String): Domain + + /** * Run the simulation. */ public suspend fun run() diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt index 93dda963..a975fa3c 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt @@ -29,10 +29,7 @@ package com.atlarge.odcsim */ public interface SimulationEngineProvider { /** - * Create an [SimulationEngine] with the given root [Behavior] and the given name. - * - * @param root The behavior of the root process. - * @param name The name of the engine instance. + * Construct an [SimulationEngine] with the given [name]. */ - public operator fun invoke(root: Behavior, name: String): SimulationEngine + public operator fun invoke(name: String): SimulationEngine } diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 207d2768..4edf94d2 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -24,14 +24,8 @@ package com.atlarge.odcsim.engine.omega -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.ProcessRef -import com.atlarge.odcsim.ReceivePort -import com.atlarge.odcsim.ReceiveRef -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.SimulationEngine import com.atlarge.odcsim.engine.omega.logging.LoggerImpl import java.time.Clock @@ -39,21 +33,19 @@ import java.time.Instant import java.time.ZoneId import java.util.PriorityQueue import java.util.UUID -import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -import kotlin.coroutines.startCoroutine import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Job import kotlinx.coroutines.Runnable import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.channels.Channel as KChannel import kotlinx.coroutines.isActive -import kotlinx.coroutines.selects.SelectClause1 import org.jetbrains.annotations.Async import org.slf4j.Logger @@ -63,10 +55,9 @@ import org.slf4j.Logger * This engine implementation is a single-threaded implementation, running logical processes synchronously and * provides a single priority queue for all events (messages, ticks, etc) that occur. * - * @param rootBehavior The behavior of the root actor. * @param name The name of the engine instance. */ -class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : SimulationEngine { +public class OmegaSimulationEngine(override val name: String) : SimulationEngine { /** * The state of the actor system. */ @@ -85,12 +76,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : /** * The active processes in the simulation engine. */ - private val registry: MutableMap<ProcessRef, ProcessImpl> = HashMap() - - /** - * The channels that have been registered by this engine. - */ - private val channels: MutableSet<ChannelImpl<*>> = HashSet() + private val registry: MutableMap<String, Domain> = HashMap() /** * A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence @@ -99,29 +85,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ private var nextId: Long = 0 - /** - * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior. - */ - @InternalCoroutinesApi - private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay { - override fun dispatch(context: CoroutineContext, block: Runnable) { - schedule(Event.Dispatch(clock.time, nextId++, block)) - } + override fun newDomain(): Domain = newDomain(null) - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { - schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation)) - } - - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - val event = Event.Timeout(clock.time + timeMillis, nextId++, block) - schedule(event) - return event - } - } - - init { - spawn(rootBehavior, "/") - } + override fun newDomain(name: String): Domain = newDomain(name, null) override suspend fun run() { check(state != SimulationEngineState.TERMINATED) { "The simulation engine is terminated" } @@ -164,80 +130,71 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : } /** - * Spawn a new logical process in the simulation universe. + * Spawn a new simulation domain. */ - private fun spawn(behavior: Behavior, name: String): ProcessRef { - val ref = ProcessRefImpl(this, name) - require(ref !in registry) { "Process name $name not unique" } - val lp = ProcessImpl(ref, behavior) - registry[ref] = lp - lp.start() - return ref + private fun newDomainImpl(name: String, parent: DomainImpl?): Domain { + val domain = DomainImpl(name, parent) + require(domain.path !in registry) { "Domain name $name not unique" } + registry[domain.path] = domain + return domain } - /** - * Register a new communication channel. - */ - private fun <T : Any> open(): Channel<T> { - val channel = ChannelImpl<T>() - channels += channel - return channel + private fun newDomain(parent: DomainImpl?): Domain { + val name = "$" + UUID.randomUUID() + return newDomainImpl(name, null) + } + + private fun newDomain(name: String, parent: DomainImpl?): Domain { + require(name.isNotEmpty()) { "Domain name may not be empty" } + require(!name.startsWith("$")) { "Domain name may not start with $-sign" } + require(!name.contains("/")) { "Domain name may not contain /" } + return newDomainImpl(name, parent) } - private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> { - val job = SupervisorJob() + private inner class DomainImpl(override val name: String, parent: DomainImpl?) : SimulationContext, Domain { + val job: Job = SupervisorJob(parent?.job) + val path: String = (parent?.path ?: "") + "/$name" - override val clock: Clock - get() = this@OmegaSimulationEngine.clock + @InternalCoroutinesApi + private val dispatcher = object : CoroutineDispatcher(), Delay { + // CoroutineDispatcher + override fun dispatch(context: CoroutineContext, block: Runnable) { + schedule(Event.Dispatch(clock.time, nextId++, block)) + } - override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) } + // Delay + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { + schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation)) + } - override fun spawn(behavior: Behavior): ProcessRef { - val name = "$" + UUID.randomUUID() - return this@OmegaSimulationEngine.spawn(behavior, name) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + val event = Event.Timeout(clock.time + timeMillis, nextId++, block) + schedule(event) + return event + } } - override fun spawn(behavior: Behavior, name: String): ProcessRef { - require(name.isNotEmpty()) { "Process name may not be empty" } - require(!name.startsWith("$")) { "Process name may not start with $-sign" } - return this@OmegaSimulationEngine.spawn(behavior, name) - } + // SimulationContext + override val key: CoroutineContext.Key<*> = SimulationContext.Key - override fun <T : Any> open(): Channel<T> = this@OmegaSimulationEngine.open() + override val domain: Domain = this - override suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T> { - require(send is ChannelImpl && send in channels) { "Invalid reference to channel" } - return SendPortImpl(send) - } + override val clock: VirtualClock + get() = this@OmegaSimulationEngine.clock - override suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T> { - require(receive is ChannelImpl && receive in channels) { "Invalid reference to channel" } - return ReceivePortImpl(receive) - } + override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) } - /** - * Start this logical process. - */ - fun start() = behavior.startCoroutine(this, this) - - override fun resumeWith(result: Result<Unit>) { - // Stop the logical process - if (result.isFailure) { - result.exceptionOrNull()!!.printStackTrace() - job.completeExceptionally(result.exceptionOrNull()!!) - } else { - job.complete() - } - } + override fun newDomain(): Domain = this@OmegaSimulationEngine.newDomain(this) - override val key: CoroutineContext.Key<*> = ProcessContext.Key + override fun newDomain(name: String): Domain = this@OmegaSimulationEngine.newDomain(name, this) - @InternalCoroutinesApi - override val coroutineContext: CoroutineContext - get() = context + // Domain + override val parent: Domain = parent ?: this @InternalCoroutinesApi - override val context: CoroutineContext = this + dispatcher + job + override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job + + override fun toString(): String = path } /** @@ -248,81 +205,6 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : } /** - * Internal [ProcessRef] implementation for this simulation engine. - */ - private data class ProcessRefImpl( - private val owner: OmegaSimulationEngine, - override val name: String - ) : ProcessRef { - override fun toString(): String = "Process[$name]" - } - - /** - * Internal [Channel] implementation. - */ - private inner class ChannelImpl<T : Any> : Channel<T>, SendRef<T>, ReceiveRef<T> { - override val send: SendRef<T> = this - override val receive: ReceiveRef<T> = this - - /** - * The underlying `kotlinx.coroutines` channel to back this channel implementation. - */ - private val channel = KChannel<T>(KChannel.UNLIMITED) - - val onReceive: SelectClause1<T> - get() = channel.onReceive - - /** - * Receive a message from this channel. - */ - suspend fun receive(): T = channel.receive() - - /** - * Send a message to this channel. - */ - fun send(message: T) = assert(channel.offer(message)) { "Failed to send message" } - } - - private inner class SendPortImpl<T : Any>(private val channelImpl: ChannelImpl<T>) : SendPort<T> { - private var closed = false - - override fun close(): Boolean { - if (closed) { - return false - } - - closed = true - return true - } - - override fun send(message: T) { - check(!closed) { "Port is closed" } - schedule(Event.Send(clock.time, nextId++, channelImpl, message)) - } - } - - private class ReceivePortImpl<T : Any>(private val channel: ChannelImpl<T>) : ReceivePort<T> { - private var closed = false - - override fun close(): Boolean { - if (closed) { - return false - } - - closed = true - return true - } - - override val onReceive: SelectClause1<T> - get() = channel.onReceive - - override suspend fun receive(): T { - check(!closed) { "Port is closed" } - return channel.receive() - } - } - - /** * A wrapper around a message that has been scheduled for processing. * * @property time The point in time to deliver the message. @@ -361,19 +243,13 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun toString(): String = "Timeout[$time]" } - - class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) { - override fun run() { - channel.send(message) - } - } } /** * A virtual [Clock] implementation for keeping track of simulation time. */ private data class VirtualClock(var time: Long) : Clock() { - override fun withZone(zone: ZoneId?): Clock = TODO("not implemented") + override fun withZone(zone: ZoneId?): Clock = throw NotImplementedError() override fun getZone(): ZoneId = ZoneId.systemDefault() diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt index 75bb2265..5dba3233 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt @@ -24,7 +24,6 @@ package com.atlarge.odcsim.engine.omega -import com.atlarge.odcsim.Behavior import com.atlarge.odcsim.SimulationEngine import com.atlarge.odcsim.SimulationEngineProvider import java.util.ServiceLoader @@ -33,7 +32,6 @@ import java.util.ServiceLoader * An [SimulationEngineProvider] for the Omega engine, used by the [ServiceLoader] API to create * [OmegaSimulationEngine] instances. */ -class OmegaSimulationEngineProvider : SimulationEngineProvider { - override operator fun invoke(root: Behavior, name: String): SimulationEngine = - OmegaSimulationEngine(root, name) +public class OmegaSimulationEngineProvider : SimulationEngineProvider { + override operator fun invoke(name: String): SimulationEngine = OmegaSimulationEngine(name) } diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt index 5db989e8..fca4826e 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt @@ -24,7 +24,7 @@ package com.atlarge.odcsim.engine.omega.logging -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import org.slf4j.Logger import org.slf4j.Marker import org.slf4j.helpers.MessageFormatter @@ -33,11 +33,11 @@ import org.slf4j.spi.LocationAwareLogger /** * An actor-specific [Logger] implementation that is aware of the calling location. * - * @param ctx The owning [ProcessContext] of this logger. + * @param ctx The owning [SimulationContext] of this logger. * @param delegate The [LocationAwareLogger] to delegate the messages to. */ internal class LocationAwareLoggerImpl( - ctx: ProcessContext, + ctx: SimulationContext, private val delegate: LocationAwareLogger ) : LoggerImpl(ctx), Logger by delegate { /** diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt index b77e85e7..856cecfa 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt @@ -24,18 +24,18 @@ package com.atlarge.odcsim.engine.omega.logging -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import org.slf4j.Logger import org.slf4j.Marker /** * A [Logger] implementation that is not aware of the calling location. * - * @param ctx The owning [ProcessContext] of this logger. + * @param ctx The owning [SimulationContext] of this logger. * @param delegate The [Logger] to delegate the messages to. */ internal class LocationIgnorantLoggerImpl( - ctx: ProcessContext, + ctx: SimulationContext, private val delegate: Logger ) : LoggerImpl(ctx), Logger by delegate { override fun warn(marker: Marker?, format: String?, arg1: Any?, arg2: Any?) { diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt index 5a52473b..1adcfdc0 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt @@ -24,7 +24,7 @@ package com.atlarge.odcsim.engine.omega.logging -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import org.slf4j.Logger import org.slf4j.LoggerFactory import org.slf4j.MDC @@ -33,14 +33,14 @@ import org.slf4j.spi.LocationAwareLogger /** * An actor-specific [Logger] implementation. * - * @param ctx The owning [ProcessContext] of this logger. + * @param ctx The owning [SimulationContext] of this logger. */ -internal abstract class LoggerImpl internal constructor(protected val ctx: ProcessContext) : Logger { +internal abstract class LoggerImpl internal constructor(protected val ctx: SimulationContext) : Logger { /** * Configure [MDC] with actor-specific information. */ protected inline fun withMdc(block: () -> Unit) { - MDC.put(MDC_PROCESS_REF, ctx.self.name) + MDC.put(MDC_PROCESS_REF, ctx.domain.name) MDC.put(MDC_PROCESS_TIME, String.format("%d", ctx.clock.millis())) try { block() @@ -62,7 +62,7 @@ internal abstract class LoggerImpl internal constructor(protected val ctx: Proce * * @param ctx The actor context to create the logger for. */ - operator fun invoke(ctx: ProcessContext): Logger { + operator fun invoke(ctx: SimulationContext): Logger { val logger = LoggerFactory.getLogger(ctx.javaClass) return if (logger is LocationAwareLogger) { LocationAwareLoggerImpl(ctx, logger) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 4e8162ec..436f4653 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -1,6 +1,6 @@ package com.atlarge.opendc.compute.core.image -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer import kotlinx.coroutines.coroutineScope @@ -28,7 +28,7 @@ class VmImage( coroutineScope { for (cpu in ctx.cpus.take(cores)) { val usage = req / (fragment.usage * 1_000_000L) - launch { cpu.run(req, usage, processContext.clock.millis() + fragment.duration) } + launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt deleted file mode 100644 index fa9f627b..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.core.monitor - -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.processContext -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch - -/** - * Events emitted by a [Server] instance. - */ -public sealed class ServerEvent { - /** - * The server that emitted this event. - */ - public abstract val server: Server - - /** - * A response sent when the bare metal driver has been initialized. - */ - public data class StateChanged( - public override val server: Server, - public val previousState: ServerState - ) : ServerEvent() -} - -/** - * Serialize the specified [ServerMonitor] instance in order to safely send this object across logical processes. - */ -public suspend fun ServerMonitor.serialize(): ServerMonitor { - val ctx = processContext - val input = ctx.open<ServerEvent>() - - ctx.launch { - val inlet = processContext.listen(input.receive) - - while (isActive) { - when (val msg = inlet.receive()) { - is ServerEvent.StateChanged -> onUpdate(msg.server, msg.previousState) - } - } - } - - return object : ServerMonitor { - private var outlet: SendPort<ServerEvent>? = null - - override suspend fun onUpdate(server: Server, previousState: ServerState) { - if (outlet == null) { - outlet = processContext.connect(input.send) - } - - outlet!!.send(ServerEvent.StateChanged(server, previousState)) - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt deleted file mode 100644 index a8996f61..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt +++ /dev/null @@ -1,146 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.metal.driver - -import com.atlarge.odcsim.ReceivePort -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.processContext -import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.PowerState -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch - -/** - * Messages that may be sent to the management interface of a bare-metal compute [Node], similar to the - * [BareMetalDriver] interface. - */ -public sealed class NodeRequest { - /** - * Initialize the compute node. - */ - public data class Initialize(public val monitor: ServerMonitor) : NodeRequest() - - /** - * Update the power state of the compute node. - */ - public data class SetPowerState(public val state: PowerState) : NodeRequest() - - /** - * Update the boot disk image of the compute node. - */ - public data class SetImage(public val image: Image) : NodeRequest() - - /** - * Obtain the state of the compute node. - */ - public object Refresh : NodeRequest() -} - -/** - * Responses emitted by a bare-metal compute [Node]. - */ -public sealed class NodeResponse { - /** - * The node that sent this response. - */ - public abstract val node: Node - - /** - * A response sent when the bare metal driver has been initialized. - */ - public data class Initialized(public override val node: Node) : NodeResponse() - - /** - * A response sent to indicate the power state of the node changed. - */ - public data class PowerStateChanged(public override val node: Node) : NodeResponse() - - /** - * A response sent to indicate the image of a node was changed. - */ - public data class ImageChanged(public override val node: Node) : NodeResponse() - - /** - * A response sent for obtaining the refreshed [Node] instance. - */ - public data class Refreshed(public override val node: Node) : NodeResponse() -} - -/** - * Serialize the specified [BareMetalDriver] instance in order to safely send this object across logical processes. - */ -public suspend fun BareMetalDriver.serialize(): BareMetalDriver { - val ctx = processContext - val input = ctx.open<NodeRequest>() - val output = ctx.open<NodeResponse>() - - ctx.launch { - val outlet = processContext.connect(output.send) - val inlet = processContext.listen(input.receive) - - while (isActive) { - when (val msg = inlet.receive()) { - is NodeRequest.Initialize -> - outlet.send(NodeResponse.Initialized(init(msg.monitor))) - is NodeRequest.SetPowerState -> - outlet.send(NodeResponse.PowerStateChanged(setPower(msg.state))) - is NodeRequest.SetImage -> - outlet.send(NodeResponse.ImageChanged(setImage(msg.image))) - is NodeRequest.Refresh -> - outlet.send(NodeResponse.Refreshed(refresh())) - } - } - } - - return object : BareMetalDriver { - private lateinit var inlet: ReceivePort<NodeResponse> - private lateinit var outlet: SendPort<NodeRequest> - - override suspend fun init(monitor: ServerMonitor): Node { - outlet = processContext.connect(input.send) - inlet = processContext.listen(output.receive) - - outlet.send(NodeRequest.Initialize(monitor)) - return (inlet.receive() as NodeResponse.Initialized).node - } - - override suspend fun setPower(powerState: PowerState): Node { - outlet.send(NodeRequest.SetPowerState(powerState)) - return (inlet.receive() as NodeResponse.PowerStateChanged).node - } - - override suspend fun setImage(image: Image): Node { - outlet.send(NodeRequest.SetImage(image)) - return (inlet.receive() as NodeResponse.ImageChanged).node - } - - override suspend fun refresh(): Node { - outlet.send(NodeRequest.Refresh) - return (inlet.receive() as NodeResponse.Refreshed).node - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index b6d74cde..1adc8652 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.compute.metal.driver -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor @@ -39,11 +39,14 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.PowerState import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil import kotlin.math.max import kotlin.math.min +import kotlinx.coroutines.withContext /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -51,12 +54,15 @@ import kotlin.math.min * @param uid The unique identifier of the machine. * @param name An optional name of the machine. * @param cpuNodes The CPU nodes/packages available to the bare metal machine. + * @param memoryUnits The memory units in this machine. + * @param domain The simulation domain the driver runs in. */ public class SimpleBareMetalDriver( uid: UUID, name: String, val cpuNodes: List<ProcessingUnit>, - val memoryUnits: List<MemoryUnit> + val memoryUnits: List<MemoryUnit>, + private val domain: Domain ) : BareMetalDriver { /** * The monitor to use. @@ -73,12 +79,17 @@ public class SimpleBareMetalDriver( */ private val flavor = Flavor(cpuNodes.sumBy { it.cores }, memoryUnits.map { it.size }.sum()) - override suspend fun init(monitor: ServerMonitor): Node { - this.monitor = monitor - return node + /** + * The job that is running the image. + */ + private var job: Job? = null + + override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { + this@SimpleBareMetalDriver.monitor = monitor + return@withContext node } - override suspend fun setPower(powerState: PowerState): Node { + override suspend fun setPower(powerState: PowerState): Node = withContext(domain.coroutineContext) { val previousPowerState = node.powerState val server = when (node.powerState to powerState) { PowerState.POWER_OFF to PowerState.POWER_OFF -> null @@ -100,36 +111,36 @@ public class SimpleBareMetalDriver( launch() } - return node + return@withContext node } - override suspend fun setImage(image: Image): Node { + override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { node = node.copy(image = image) - return node + return@withContext node } - override suspend fun refresh(): Node = node + override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } /** * Launch the server image on the machine. */ private suspend fun launch() { - val serverCtx = this.serverCtx + val serverContext = serverCtx - processContext.spawn { - serverCtx.init() + job = domain.launch { + serverContext.init() try { - node.server!!.image(serverCtx) - serverCtx.exit() + node.server!!.image(serverContext) + serverContext.exit() } catch (cause: Throwable) { - serverCtx.exit(cause) + serverContext.exit(cause) } } } private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext { override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - val start = processContext.clock.millis() + val start = simulationContext.clock.millis() val usage = min(maxUsage, info.clockRate) * 1_000_000 // Usage from MHz to Hz try { @@ -141,7 +152,7 @@ public class SimpleBareMetalDriver( } catch (_: CancellationException) { // On cancellation, we compute and return the remaining burst } - val end = processContext.clock.millis() + val end = simulationContext.clock.millis() val granted = ceil((end - start) / 1000.0 * usage).toLong() return max(0, burst - granted) } @@ -149,7 +160,6 @@ public class SimpleBareMetalDriver( private val serverCtx = object : ServerManagementContext { private var initialized: Boolean = false - private lateinit var ctx: ProcessContext override val cpus: List<ProcessorContextImpl> = cpuNodes .asSequence() @@ -172,7 +182,6 @@ public class SimpleBareMetalDriver( val previousState = server.state server = server.copy(state = ServerState.ACTIVE) monitor.onUpdate(server, previousState) - ctx = processContext initialized = true } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index 6b5c0979..b18a4006 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.compute.metal.service +import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image @@ -31,11 +32,12 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.PowerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import kotlinx.coroutines.withContext /** * A very basic implementation of the [ProvisioningService]. */ -public class SimpleProvisioningService : ProvisioningService, ServerMonitor { +public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, ServerMonitor { /** * The active nodes in this service. */ @@ -46,29 +48,31 @@ public class SimpleProvisioningService : ProvisioningService, ServerMonitor { */ private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf() - override suspend fun create(driver: BareMetalDriver): Node { - val node = driver.init(this) + override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) { + val node = driver.init(this@SimpleProvisioningService) nodes[node] = driver - return node + return@withContext node } - override suspend fun nodes(): Set<Node> = nodes.keys + override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys } - override suspend fun refresh(node: Node): Node { - return nodes[node]!!.refresh() + override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) { + return@withContext nodes[node]!!.refresh() } - override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node { + override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { val driver = nodes[node]!! driver.setImage(image) driver.setPower(PowerState.POWER_OFF) val newNode = driver.setPower(PowerState.POWER_ON) monitors[newNode.server!!] = monitor - return newNode + return@withContext newNode } override suspend fun onUpdate(server: Server, previousState: ServerState) { - monitors[server]?.onUpdate(server, previousState) + withContext(domain.coroutineContext) { + monitors[server]?.onUpdate(server, previousState) + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index c0d5fe0f..9745b56c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerState @@ -65,7 +65,7 @@ class HypervisorVirtDriver( val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) memoryAvailable -= requiredMemory - vms.add(VmServerContext(server, monitor, processContext)) + vms.add(VmServerContext(server, monitor, simulationContext)) return server } @@ -76,11 +76,11 @@ class HypervisorVirtDriver( internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, - ctx: ProcessContext + ctx: SimulationContext ) : ServerManagementContext { private var initialized: Boolean = false - internal val job: Job = ctx.launch { + internal val job: Job = ctx.domain.launch { init() try { server.image(this@VmServerContext) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt index 0b172c61..4cc5ac9e 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.execution.ProcessorContext @@ -91,7 +91,7 @@ public class VmSchedulerImpl( flush() val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment - val call = processContext.launch { + val call = simulationContext.domain.launch { var duration: Long = Long.MAX_VALUE var deadline: Long = Long.MAX_VALUE @@ -121,7 +121,7 @@ public class VmSchedulerImpl( // We run the total burst on the host processor. Note that this call may be cancelled at any moment in // time, so not all of the burst may be executed. val remainder = run(burst, usage, deadline) - val time = processContext.clock.millis() + val time = simulationContext.clock.millis() val totalGrantedBurst: Long = burst - remainder // Compute for each vCPU the diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index ef1528d9..888364e2 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,6 +1,6 @@ package com.atlarge.opendc.compute.virt.service -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState @@ -15,7 +15,7 @@ import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.launch class SimpleVirtProvisioningService( - private val ctx: ProcessContext, + private val ctx: SimulationContext, private val provisioningService: ProvisioningService, private val hypervisorMonitor: HypervisorMonitor ) : VirtProvisioningService, ServerMonitor { @@ -50,7 +50,7 @@ class SimpleVirtProvisioningService( internal val imagesByServer: MutableMap<Server, MutableSet<ImageView>> = mutableMapOf() init { - ctx.launch { + ctx.domain.launch { val provisionedNodes = provisioningService.nodes().toList() val deployedNodes = provisionedNodes.map { node -> val hypervisorImage = @@ -72,7 +72,7 @@ class SimpleVirtProvisioningService( } private fun requestCycle() { - ctx.launch { + ctx.domain.launch { schedule() } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index dc4f8078..6b234b73 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,14 +25,17 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.PowerState -import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import java.util.ServiceLoader import java.util.UUID @@ -43,26 +46,35 @@ internal class SimpleBareMetalDriverTest { */ @Test fun smoke() { + var finalState: ServerState = ServerState.BUILD val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ _ -> - val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) + val system = provider("sim") + val root = system.newDomain(name = "root") + root.launch { + val dom = root.newDomain(name = "driver") + val flavor = Flavor(4, 0) + val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom) + val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { - println(server) + finalState = server.state } } - val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList()) + val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) - driver.init(monitor) - driver.setImage(image) - driver.setPower(PowerState.POWER_ON) - delay(5) - println(driver.refresh()) - }, name = "sim") + // Batch driver commands + withContext(dom.coroutineContext) { + driver.init(monitor) + driver.setImage(image) + driver.setPower(PowerState.POWER_ON) + } + } runBlocking { system.run() system.terminate() } + + assertEquals(finalState, ServerState.SHUTOFF) } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index 85e3383c..3b32b3b8 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import java.util.ServiceLoader @@ -47,21 +48,25 @@ internal class SimpleProvisioningServiceTest { @Test fun smoke() { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ _ -> + val system = provider("sim") + val root = system.newDomain(name = "root") + root.launch { val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { println(server) } } - val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList()) - val provisioner = SimpleProvisioningService() + val dom = root.newDomain("provisioner") + val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom) + + val provisioner = SimpleProvisioningService(dom) provisioner.create(driver) delay(5) val nodes = provisioner.nodes() provisioner.deploy(nodes.first(), image, monitor) - }, name = "sim") + } runBlocking { system.run() diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index f59f4830..002fa175 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -25,7 +25,7 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor @@ -37,6 +37,7 @@ import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import java.util.ServiceLoader @@ -52,7 +53,10 @@ internal class HypervisorTest { @Test fun smoke() { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ _ -> + val system = provider("test") + val root = system.newDomain("root") + + root.launch { val vmm = HypervisorImage(object : HypervisorMonitor { override fun onSliceFinish( time: Long, @@ -68,10 +72,12 @@ internal class HypervisorTest { val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000, 1) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("[${processContext.clock.millis()}]: $server") + println("[${simulationContext.clock.millis()}]: $server") } } - val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1)), emptyList()) + + val driverDom = root.newDomain("driver") + val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1)), emptyList(), driverDom) metalDriver.init(monitor) metalDriver.setImage(vmm) @@ -82,7 +88,7 @@ internal class HypervisorTest { val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver] vmDriver.spawn(workloadA, monitor, flavor) vmDriver.spawn(workloadB, monitor, flavor) - }, name = "sim") + } runBlocking { system.run() diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index 96796c07..d5e1404a 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.experiments.sc18 import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader @@ -39,12 +40,14 @@ import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task +import kotlin.math.max +import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File import java.util.ServiceLoader -import kotlin.math.max /** * Main entry point of the experiment. @@ -55,9 +58,6 @@ fun main(args: Array<String>) { return } - val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) - .use { it.read() } - var total = 0 var finished = 0 @@ -85,11 +85,16 @@ fun main(args: Array<String>) { } val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ ctx -> - println(ctx.clock.instant()) - val scheduler = StageWorkflowService( - ctx, - environment.platforms[0].zones[0].services[ProvisioningService.Key], + val system = provider(name = "sim") + + val schedulerDomain = system.newDomain(name = "scheduler") + val schedulerAsync = schedulerDomain.async { + val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) + .use { it.construct(system.newDomain("topology")) } + + StageWorkflowService( + schedulerDomain, + environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), @@ -98,8 +103,13 @@ fun main(args: Array<String>) { resourceFilterPolicy = FunctionalResourceFilterPolicy, resourceSelectionPolicy = FirstFitResourceSelectionPolicy ) + } + val broker = system.newDomain(name = "broker") + broker.launch { + val ctx = simulationContext val reader = GwfTraceReader(File(args[0])) + val scheduler = schedulerAsync.await() while (reader.hasNext()) { val (time, job) = reader.next() @@ -107,11 +117,7 @@ fun main(args: Array<String>) { delay(max(0, time * 1000 - ctx.clock.millis())) scheduler.submit(job, monitor) } - - token.receive() - - println(ctx.clock.instant()) - }, name = "sim") + } runBlocking { system.run() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 3882feb7..48aca303 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState @@ -35,6 +36,7 @@ import com.atlarge.opendc.format.environment.sc20.Sc20EnvironmentReader import com.atlarge.opendc.format.trace.vm.VmTraceReader import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File import java.util.ServiceLoader @@ -48,10 +50,6 @@ fun main(args: Array<String>) { println("error: Please provide path to directory containing VM trace files") return } - - val environment = Sc20EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-small.json")) - .use { it.read() } - val token = Channel<Boolean>() val monitor = object : ServerMonitor { @@ -61,10 +59,17 @@ fun main(args: Array<String>) { } val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ ctx -> - println(ctx.clock.instant()) + val system = provider("test") + val root = system.newDomain("root") + + root.launch { + val environment = Sc20EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-small.json")) + .use { it.construct(root) } + + println(simulationContext.clock.instant()) + val scheduler = SimpleVirtProvisioningService( - ctx, + simulationContext, environment.platforms[0].zones[0].services[ProvisioningService.Key], Sc20HypervisorMonitor() ) @@ -73,14 +78,14 @@ fun main(args: Array<String>) { delay(1376314846 * 1000L) while (reader.hasNext()) { val (time, workload) = reader.next() - delay(max(0, time * 1000 - ctx.clock.millis())) + delay(max(0, time * 1000 - simulationContext.clock.millis())) scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory)) } token.receive() - println(ctx.clock.instant()) - }, name = "sim") + println(simulationContext.clock.instant()) + } runBlocking { system.run() diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt index 6ca53a05..42551f43 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.format.environment +import com.atlarge.odcsim.Domain import com.atlarge.opendc.core.Environment import java.io.Closeable @@ -32,7 +33,7 @@ import java.io.Closeable */ interface EnvironmentReader : Closeable { /** - * Read the description of the datacenter environment as [Environment]. + * Construct an [Environment] in the specified domain. */ - fun read(): Environment + suspend fun construct(dom: Domain): Environment } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index ac44337a..8898ddc7 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.format.environment.sc18 +import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver @@ -37,7 +38,6 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import kotlinx.coroutines.runBlocking import java.io.InputStream import java.util.UUID @@ -52,10 +52,11 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb /** * The environment that was read from the file. */ - private val environment: Environment + private val setup: Setup = mapper.readValue(input) + + override suspend fun construct(dom: Domain): Environment { + val provisioningDomain = dom.newDomain("provisioner") - init { - val setup = mapper.readValue<Setup>(input) var counter = 0 val nodes = setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> @@ -69,18 +70,17 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, listOf(MemoryUnit("", "", 2300.0, 16000))) + SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, listOf(MemoryUnit("", "", 2300.0, 16000)), + dom.newDomain("node-$counter")) } } } } } - val provisioningService = SimpleProvisioningService() - runBlocking { - for (node in nodes) { - provisioningService.create(node) - } + val provisioningService = SimpleProvisioningService(provisioningDomain) + for (node in nodes) { + provisioningService.create(node) } val serviceRegistry = ServiceRegistryImpl() @@ -92,10 +92,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb ) ) - environment = Environment(setup.name, null, listOf(platform)) + return Environment(setup.name, null, listOf(platform)) } - override fun read(): Environment = environment - override fun close() {} } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 5eb711cc..fecba302 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.format.environment.sc20 +import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver @@ -37,7 +38,6 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import kotlinx.coroutines.runBlocking import java.io.InputStream import java.util.UUID @@ -51,10 +51,9 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb /** * The environment that was read from the file. */ - private val environment: Environment + private val setup: Setup = mapper.readValue(input) - init { - val setup = mapper.readValue<Setup>(input) + override suspend fun construct(dom: Domain): Environment { var counter = 0 val nodes = setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> @@ -74,18 +73,16 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, memories) + SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, memories, dom.newDomain("node-$counter")) } } } } } - val provisioningService = SimpleProvisioningService() - runBlocking { - for (node in nodes) { - provisioningService.create(node) - } + val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) + for (node in nodes) { + provisioningService.create(node) } val serviceRegistry = ServiceRegistryImpl() @@ -97,10 +94,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb ) ) - environment = Environment(setup.name, null, listOf(platform)) + return Environment(setup.name, null, listOf(platform)) } - override fun read(): Environment = environment - override fun close() {} } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 48f06bcd..008cd1ee 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -24,7 +24,8 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.monitor.ServerMonitor @@ -41,13 +42,14 @@ import com.atlarge.opendc.workflows.workload.Job import java.util.PriorityQueue import java.util.Queue import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for * Datacenter Scheduling. */ class StageWorkflowService( - private val ctx: ProcessContext, + private val domain: Domain, private val provisioningService: ProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -167,7 +169,7 @@ class StageWorkflowService( private val resourceSelectionPolicy: Comparator<Node> init { - ctx.launch { + domain.launch { nodes = provisioningService.nodes().toList() available.addAll(nodes) } @@ -181,9 +183,9 @@ class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override suspend fun submit(job: Job, monitor: WorkflowMonitor) { + override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) { // J1 Incoming Jobs - val jobInstance = JobState(job, monitor, ctx.clock.millis()) + val jobInstance = JobState(job, monitor, simulationContext.clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -230,7 +232,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis()) + jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis()) rootListener.jobStarted(jobInstance) } @@ -292,23 +294,23 @@ class StageWorkflowService( } } - override suspend fun onUpdate(server: Server, previousState: ServerState) { + override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) { when (server.state) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) - task.startedAt = ctx.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis()) + task.startedAt = simulationContext.clock.millis() + task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) rootListener.taskStarted(task) } ServerState.SHUTOFF, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() val job = task.job task.state = TaskStatus.FINISHED - task.finishedAt = ctx.clock.millis() + task.finishedAt = simulationContext.clock.millis() job.tasks.remove(task) available += task.host!! activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis()) + job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -333,7 +335,7 @@ class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - job.monitor.onJobFinish(job.job, ctx.clock.millis()) + job.monitor.onJobFinish(job.job, simulationContext.clock.millis()) rootListener.jobFinished(job) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index cfec93b5..776f0b07 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.workflows.service.stage.StagePolicy import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -66,13 +66,13 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = processContext + val ctx = simulationContext if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. val delay = quantum - (ctx.clock.millis() % quantum) - val job = ctx.launch { + val job = ctx.domain.launch { delay(delay) next = null scheduler.schedule() @@ -93,11 +93,11 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = processContext + val ctx = simulationContext if (next == null) { val delay = random.nextInt(200).toLong() - val job = ctx.launch { + val job = ctx.domain.launch { delay(delay) next = null scheduler.schedule() |
