diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-16 20:30:17 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-28 14:28:29 +0100 |
| commit | b82e573c67f0004945aa18c575268100fb279b56 (patch) | |
| tree | 7af8e303ea9ab3821e8d8c7c1c7e9b1de058a9e7 | |
| parent | 0c19b32433e2086e72e0d22595f4daa6ef04b64b (diff) | |
refactor: Change from logical processes to simulation domains
This change moves the simulator terminology from logical processes to
simulation domains. This prevents the clash with "processes" that we are
trying to simulate.
In addition, simulation domains allows us to reduce the amount of
boilerplate and instead allows for simulation modelled using standard
techniques.
31 files changed, 285 insertions, 827 deletions
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt deleted file mode 100644 index 1116c447..00000000 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.odcsim - -/** - * The behavior of a logical process defines how the process operates within its environments, represented as coroutine. - */ -public typealias Behavior = suspend (ctx: ProcessContext) -> Unit diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt deleted file mode 100644 index 10b4908a..00000000 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt +++ /dev/null @@ -1,70 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.odcsim - -import java.io.Serializable - -/** - * A unidirectional communication medium using message passing. Processes may send messages over a channel, and - * another process is able to receive messages sent over a channel it has a reference to (in form of a [ReceivePort]). - * - * Channels are represented by their send and receive endpoints at which processes may respectively send and receive - * messages from this channel. - * - * Channels and their respective send and receive references may be shared freely between logical processes in the same - * simulation. - */ -public interface Channel<T : Any> : Serializable { - /** - * The endpoint of the channel processes may use to send messages to. - */ - public val send: SendRef<T> - - /** - * The endpoint of the channel processes may receive messages from. - */ - public val receive: ReceiveRef<T> - - /** - * Obtain the send endpoint of the channel when unpacking the channel. See [send]. - */ - public operator fun component1(): SendRef<T> = send - - /** - * Obtain the receive endpoint of the channel when unpacking the channel. See [receive]. - */ - public operator fun component2(): ReceiveRef<T> = receive -} - -/** - * An opaque object representing a [Channel] endpoint through which logical processes can send messages over the - * channel. - */ -public interface SendRef<in T : Any> : Serializable - -/** - * An opaque object representing the receive endpoint of a [Channel]. - */ -public interface ReceiveRef<out T : Any> : Serializable diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt deleted file mode 100644 index b15ce3ae..00000000 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.odcsim - -suspend fun <T : Any, U : Any> SendRef<T>.ask(block: (SendRef<U>) -> T): U { - val ctx = processContext - val outlet = ctx.connect(this) - val channel = ctx.open<U>() - try { - outlet.send(block(channel.send)) - } finally { - outlet.close() - } - - val inlet = ctx.listen(channel.receive) - try { - return inlet.receive() - } finally { - inlet.close() - } -} - -suspend fun <T : Any> SendRef<T>.sendOnce(msg: T) { - val outlet = processContext.connect(this) - try { - outlet.send(msg) - } finally { - outlet.close() - } -} diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessRef.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Domain.kt index 833458e4..c850952c 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessRef.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Domain.kt @@ -24,22 +24,31 @@ package com.atlarge.odcsim -import java.io.Serializable +import kotlinx.coroutines.CoroutineScope /** - * A reference to a logical process in simulation. + * An isolated execution unit that runs concurrently in simulation to the other simulation domains. A domain defines a + * logical boundary between processes in simulation. */ -public interface ProcessRef : Comparable<ProcessRef>, Serializable { +public interface Domain : CoroutineScope { /** - * The name of the process. + * The name of this domain. */ public val name: String /** - * Compare [other] process ref with this process reference for order. - * - * @return a negative integer, zero, or a positive integer as this object is less than, equal to, or greater - * than the specified path. + * The parent domain to which the lifecycle of this domain is bound. In case this is a root domain, this refers to + * itself. */ - override fun compareTo(other: ProcessRef): Int = name.compareTo(other.name) + public val parent: Domain + + /** + * Construct an anonymous simulation sub-domain that is bound to the lifecycle of this domain. + */ + public fun newDomain(): Domain + + /** + * Construct a new simulation sub-domain with the specified [name]. + */ + public fun newDomain(name: String): Domain } diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt deleted file mode 100644 index 7c730866..00000000 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.odcsim - -import kotlinx.coroutines.selects.SelectClause1 - -/** - * A communication endpoint of a specific logical process through which messages pass. - * - * Ports are tied to a specific logical process and may not be shared with other processes. Doing so results in - * undefined behavior and might cause violation of time-consistency between processes. - */ -public interface Port { - /** - * Close this communication endpoint. This informs the process(es) at the other end of the port that the caller - * will not send or receive messages via this port. - * - * This is an idempotent operation – subsequent invocations of this function have no effect and return `false`. - */ - fun close(): Boolean -} - -/** - * A [Port] through which a logical process may receive messages from other [SendPort]s. - */ -public interface ReceivePort<out T : Any> : Port { - /** - * Receive a message send to this port or suspend the caller while no messages have been received at this port yet. - */ - public suspend fun receive(): T - - /** - * Clause for select expression for receiving a message from the channel. - */ - val onReceive: SelectClause1<T> -} - -/** - * A [Port] through which logical processes may send messages to a [ReceivePort]. - */ -public interface SendPort<in T : Any> : Port { - /** - * Send a message via this port to the process(es) listening at the other end of the port. - * - * Messages are send asynchronously to the receivers and do not suspend the caller. This method guarantees - * exactly-once delivery while respecting time-consistency between owner of the send port and its receivers. - */ - public fun send(message: T) -} diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessContext.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt index 30ef4114..c51d1d8b 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessContext.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt @@ -27,22 +27,21 @@ package com.atlarge.odcsim import java.time.Clock import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -import kotlinx.coroutines.CoroutineScope import org.slf4j.Logger /** - * Represents the execution context of a logical process in simulation. + * Represents the execution context of a simulation domain. */ -public interface ProcessContext : CoroutineContext.Element, CoroutineScope { +public interface SimulationContext : CoroutineContext.Element { /** - * Key for [ProcessContext] instance in the coroutine context. + * Key for [SimulationContext] instance in the coroutine context. */ - companion object Key : CoroutineContext.Key<ProcessContext> + companion object Key : CoroutineContext.Key<SimulationContext> /** - * The reference to the logical process of this context. + * The reference to the current simulation domain. */ - public val self: ProcessRef + public val domain: Domain /** * The clock tracking the simulation time. @@ -53,37 +52,12 @@ public interface ProcessContext : CoroutineContext.Element, CoroutineScope { * A logger instance tied to the logical process. */ public val log: Logger - - /** - * Spawn an anonymous logical process in the simulation universe with the specified [behavior]. - */ - public fun spawn(behavior: Behavior): ProcessRef - - /** - * Spawn a logical process in the simulation universe with the specified [behavior] and [name]. - */ - public fun spawn(behavior: Behavior, name: String): ProcessRef - - /** - * Open a new communication [Channel] for messages of type [T]. - */ - public fun <T : Any> open(): Channel<T> - - /** - * Create a [SendPort] for sending messages to the specified [send]. - */ - public suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T> - - /** - * Create a [ReceivePort] for listening to the messages sent to the specified [receive] endpoint of a channel. - */ - public suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T> } /** - * The process context of the current coroutine. + * The simulation context of the current coroutine. */ @Suppress("WRONG_MODIFIER_TARGET") -public suspend inline val processContext: ProcessContext +public suspend inline val simulationContext: SimulationContext @Suppress("ILLEGAL_SUSPEND_PROPERTY_ACCESS") - get() = coroutineContext[ProcessContext.Key] ?: throw IllegalStateException("No process context active") + get() = coroutineContext[SimulationContext] ?: throw IllegalStateException("No simulation context available") diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt index e35acbf0..db05cb1d 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt @@ -37,6 +37,16 @@ public interface SimulationEngine { public val name: String /** + * Construct an anonymous root simulation domain. + */ + public fun newDomain(): Domain + + /** + * Construct a new root simulation domain with the specified [name]. + */ + public fun newDomain(name: String): Domain + + /** * Run the simulation. */ public suspend fun run() diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt index 93dda963..a975fa3c 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt @@ -29,10 +29,7 @@ package com.atlarge.odcsim */ public interface SimulationEngineProvider { /** - * Create an [SimulationEngine] with the given root [Behavior] and the given name. - * - * @param root The behavior of the root process. - * @param name The name of the engine instance. + * Construct an [SimulationEngine] with the given [name]. */ - public operator fun invoke(root: Behavior, name: String): SimulationEngine + public operator fun invoke(name: String): SimulationEngine } diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 207d2768..4edf94d2 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -24,14 +24,8 @@ package com.atlarge.odcsim.engine.omega -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.ProcessRef -import com.atlarge.odcsim.ReceivePort -import com.atlarge.odcsim.ReceiveRef -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.SimulationEngine import com.atlarge.odcsim.engine.omega.logging.LoggerImpl import java.time.Clock @@ -39,21 +33,19 @@ import java.time.Instant import java.time.ZoneId import java.util.PriorityQueue import java.util.UUID -import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -import kotlin.coroutines.startCoroutine import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Job import kotlinx.coroutines.Runnable import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.channels.Channel as KChannel import kotlinx.coroutines.isActive -import kotlinx.coroutines.selects.SelectClause1 import org.jetbrains.annotations.Async import org.slf4j.Logger @@ -63,10 +55,9 @@ import org.slf4j.Logger * This engine implementation is a single-threaded implementation, running logical processes synchronously and * provides a single priority queue for all events (messages, ticks, etc) that occur. * - * @param rootBehavior The behavior of the root actor. * @param name The name of the engine instance. */ -class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : SimulationEngine { +public class OmegaSimulationEngine(override val name: String) : SimulationEngine { /** * The state of the actor system. */ @@ -85,12 +76,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : /** * The active processes in the simulation engine. */ - private val registry: MutableMap<ProcessRef, ProcessImpl> = HashMap() - - /** - * The channels that have been registered by this engine. - */ - private val channels: MutableSet<ChannelImpl<*>> = HashSet() + private val registry: MutableMap<String, Domain> = HashMap() /** * A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence @@ -99,29 +85,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ private var nextId: Long = 0 - /** - * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior. - */ - @InternalCoroutinesApi - private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay { - override fun dispatch(context: CoroutineContext, block: Runnable) { - schedule(Event.Dispatch(clock.time, nextId++, block)) - } + override fun newDomain(): Domain = newDomain(null) - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { - schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation)) - } - - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - val event = Event.Timeout(clock.time + timeMillis, nextId++, block) - schedule(event) - return event - } - } - - init { - spawn(rootBehavior, "/") - } + override fun newDomain(name: String): Domain = newDomain(name, null) override suspend fun run() { check(state != SimulationEngineState.TERMINATED) { "The simulation engine is terminated" } @@ -164,80 +130,71 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : } /** - * Spawn a new logical process in the simulation universe. + * Spawn a new simulation domain. */ - private fun spawn(behavior: Behavior, name: String): ProcessRef { - val ref = ProcessRefImpl(this, name) - require(ref !in registry) { "Process name $name not unique" } - val lp = ProcessImpl(ref, behavior) - registry[ref] = lp - lp.start() - return ref + private fun newDomainImpl(name: String, parent: DomainImpl?): Domain { + val domain = DomainImpl(name, parent) + require(domain.path !in registry) { "Domain name $name not unique" } + registry[domain.path] = domain + return domain } - /** - * Register a new communication channel. - */ - private fun <T : Any> open(): Channel<T> { - val channel = ChannelImpl<T>() - channels += channel - return channel + private fun newDomain(parent: DomainImpl?): Domain { + val name = "$" + UUID.randomUUID() + return newDomainImpl(name, null) + } + + private fun newDomain(name: String, parent: DomainImpl?): Domain { + require(name.isNotEmpty()) { "Domain name may not be empty" } + require(!name.startsWith("$")) { "Domain name may not start with $-sign" } + require(!name.contains("/")) { "Domain name may not contain /" } + return newDomainImpl(name, parent) } - private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> { - val job = SupervisorJob() + private inner class DomainImpl(override val name: String, parent: DomainImpl?) : SimulationContext, Domain { + val job: Job = SupervisorJob(parent?.job) + val path: String = (parent?.path ?: "") + "/$name" - override val clock: Clock - get() = this@OmegaSimulationEngine.clock + @InternalCoroutinesApi + private val dispatcher = object : CoroutineDispatcher(), Delay { + // CoroutineDispatcher + override fun dispatch(context: CoroutineContext, block: Runnable) { + schedule(Event.Dispatch(clock.time, nextId++, block)) + } - override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) } + // Delay + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { + schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation)) + } - override fun spawn(behavior: Behavior): ProcessRef { - val name = "$" + UUID.randomUUID() - return this@OmegaSimulationEngine.spawn(behavior, name) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + val event = Event.Timeout(clock.time + timeMillis, nextId++, block) + schedule(event) + return event + } } - override fun spawn(behavior: Behavior, name: String): ProcessRef { - require(name.isNotEmpty()) { "Process name may not be empty" } - require(!name.startsWith("$")) { "Process name may not start with $-sign" } - return this@OmegaSimulationEngine.spawn(behavior, name) - } + // SimulationContext + override val key: CoroutineContext.Key<*> = SimulationContext.Key - override fun <T : Any> open(): Channel<T> = this@OmegaSimulationEngine.open() + override val domain: Domain = this - override suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T> { - require(send is ChannelImpl && send in channels) { "Invalid reference to channel" } - return SendPortImpl(send) - } + override val clock: VirtualClock + get() = this@OmegaSimulationEngine.clock - override suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T> { - require(receive is ChannelImpl && receive in channels) { "Invalid reference to channel" } - return ReceivePortImpl(receive) - } + override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) } - /** - * Start this logical process. - */ - fun start() = behavior.startCoroutine(this, this) - - override fun resumeWith(result: Result<Unit>) { - // Stop the logical process - if (result.isFailure) { - result.exceptionOrNull()!!.printStackTrace() - job.completeExceptionally(result.exceptionOrNull()!!) - } else { - job.complete() - } - } + override fun newDomain(): Domain = this@OmegaSimulationEngine.newDomain(this) - override val key: CoroutineContext.Key<*> = ProcessContext.Key + override fun newDomain(name: String): Domain = this@OmegaSimulationEngine.newDomain(name, this) - @InternalCoroutinesApi - override val coroutineContext: CoroutineContext - get() = context + // Domain + override val parent: Domain = parent ?: this @InternalCoroutinesApi - override val context: CoroutineContext = this + dispatcher + job + override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job + + override fun toString(): String = path } /** @@ -248,81 +205,6 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : } /** - * Internal [ProcessRef] implementation for this simulation engine. - */ - private data class ProcessRefImpl( - private val owner: OmegaSimulationEngine, - override val name: String - ) : ProcessRef { - override fun toString(): String = "Process[$name]" - } - - /** - * Internal [Channel] implementation. - */ - private inner class ChannelImpl<T : Any> : Channel<T>, SendRef<T>, ReceiveRef<T> { - override val send: SendRef<T> = this - override val receive: ReceiveRef<T> = this - - /** - * The underlying `kotlinx.coroutines` channel to back this channel implementation. - */ - private val channel = KChannel<T>(KChannel.UNLIMITED) - - val onReceive: SelectClause1<T> - get() = channel.onReceive - - /** - * Receive a message from this channel. - */ - suspend fun receive(): T = channel.receive() - - /** - * Send a message to this channel. - */ - fun send(message: T) = assert(channel.offer(message)) { "Failed to send message" } - } - - private inner class SendPortImpl<T : Any>(private val channelImpl: ChannelImpl<T>) : SendPort<T> { - private var closed = false - - override fun close(): Boolean { - if (closed) { - return false - } - - closed = true - return true - } - - override fun send(message: T) { - check(!closed) { "Port is closed" } - schedule(Event.Send(clock.time, nextId++, channelImpl, message)) - } - } - - private class ReceivePortImpl<T : Any>(private val channel: ChannelImpl<T>) : ReceivePort<T> { - private var closed = false - - override fun close(): Boolean { - if (closed) { - return false - } - - closed = true - return true - } - - override val onReceive: SelectClause1<T> - get() = channel.onReceive - - override suspend fun receive(): T { - check(!closed) { "Port is closed" } - return channel.receive() - } - } - - /** * A wrapper around a message that has been scheduled for processing. * * @property time The point in time to deliver the message. @@ -361,19 +243,13 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun toString(): String = "Timeout[$time]" } - - class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) { - override fun run() { - channel.send(message) - } - } } /** * A virtual [Clock] implementation for keeping track of simulation time. */ private data class VirtualClock(var time: Long) : Clock() { - override fun withZone(zone: ZoneId?): Clock = TODO("not implemented") + override fun withZone(zone: ZoneId?): Clock = throw NotImplementedError() override fun getZone(): ZoneId = ZoneId.systemDefault() diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt index 75bb2265..5dba3233 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt @@ -24,7 +24,6 @@ package com.atlarge.odcsim.engine.omega -import com.atlarge.odcsim.Behavior import com.atlarge.odcsim.SimulationEngine import com.atlarge.odcsim.SimulationEngineProvider import java.util.ServiceLoader @@ -33,7 +32,6 @@ import java.util.ServiceLoader * An [SimulationEngineProvider] for the Omega engine, used by the [ServiceLoader] API to create * [OmegaSimulationEngine] instances. */ -class OmegaSimulationEngineProvider : SimulationEngineProvider { - override operator fun invoke(root: Behavior, name: String): SimulationEngine = - OmegaSimulationEngine(root, name) +public class OmegaSimulationEngineProvider : SimulationEngineProvider { + override operator fun invoke(name: String): SimulationEngine = OmegaSimulationEngine(name) } diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt index 5db989e8..fca4826e 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt @@ -24,7 +24,7 @@ package com.atlarge.odcsim.engine.omega.logging -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import org.slf4j.Logger import org.slf4j.Marker import org.slf4j.helpers.MessageFormatter @@ -33,11 +33,11 @@ import org.slf4j.spi.LocationAwareLogger /** * An actor-specific [Logger] implementation that is aware of the calling location. * - * @param ctx The owning [ProcessContext] of this logger. + * @param ctx The owning [SimulationContext] of this logger. * @param delegate The [LocationAwareLogger] to delegate the messages to. */ internal class LocationAwareLoggerImpl( - ctx: ProcessContext, + ctx: SimulationContext, private val delegate: LocationAwareLogger ) : LoggerImpl(ctx), Logger by delegate { /** diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt index b77e85e7..856cecfa 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt @@ -24,18 +24,18 @@ package com.atlarge.odcsim.engine.omega.logging -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import org.slf4j.Logger import org.slf4j.Marker /** * A [Logger] implementation that is not aware of the calling location. * - * @param ctx The owning [ProcessContext] of this logger. + * @param ctx The owning [SimulationContext] of this logger. * @param delegate The [Logger] to delegate the messages to. */ internal class LocationIgnorantLoggerImpl( - ctx: ProcessContext, + ctx: SimulationContext, private val delegate: Logger ) : LoggerImpl(ctx), Logger by delegate { override fun warn(marker: Marker?, format: String?, arg1: Any?, arg2: Any?) { diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt index 5a52473b..1adcfdc0 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt @@ -24,7 +24,7 @@ package com.atlarge.odcsim.engine.omega.logging -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import org.slf4j.Logger import org.slf4j.LoggerFactory import org.slf4j.MDC @@ -33,14 +33,14 @@ import org.slf4j.spi.LocationAwareLogger /** * An actor-specific [Logger] implementation. * - * @param ctx The owning [ProcessContext] of this logger. + * @param ctx The owning [SimulationContext] of this logger. */ -internal abstract class LoggerImpl internal constructor(protected val ctx: ProcessContext) : Logger { +internal abstract class LoggerImpl internal constructor(protected val ctx: SimulationContext) : Logger { /** * Configure [MDC] with actor-specific information. */ protected inline fun withMdc(block: () -> Unit) { - MDC.put(MDC_PROCESS_REF, ctx.self.name) + MDC.put(MDC_PROCESS_REF, ctx.domain.name) MDC.put(MDC_PROCESS_TIME, String.format("%d", ctx.clock.millis())) try { block() @@ -62,7 +62,7 @@ internal abstract class LoggerImpl internal constructor(protected val ctx: Proce * * @param ctx The actor context to create the logger for. */ - operator fun invoke(ctx: ProcessContext): Logger { + operator fun invoke(ctx: SimulationContext): Logger { val logger = LoggerFactory.getLogger(ctx.javaClass) return if (logger is LocationAwareLogger) { LocationAwareLoggerImpl(ctx, logger) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 4e8162ec..436f4653 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -1,6 +1,6 @@ package com.atlarge.opendc.compute.core.image -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer import kotlinx.coroutines.coroutineScope @@ -28,7 +28,7 @@ class VmImage( coroutineScope { for (cpu in ctx.cpus.take(cores)) { val usage = req / (fragment.usage * 1_000_000L) - launch { cpu.run(req, usage, processContext.clock.millis() + fragment.duration) } + launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt deleted file mode 100644 index fa9f627b..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.core.monitor - -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.processContext -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch - -/** - * Events emitted by a [Server] instance. - */ -public sealed class ServerEvent { - /** - * The server that emitted this event. - */ - public abstract val server: Server - - /** - * A response sent when the bare metal driver has been initialized. - */ - public data class StateChanged( - public override val server: Server, - public val previousState: ServerState - ) : ServerEvent() -} - -/** - * Serialize the specified [ServerMonitor] instance in order to safely send this object across logical processes. - */ -public suspend fun ServerMonitor.serialize(): ServerMonitor { - val ctx = processContext - val input = ctx.open<ServerEvent>() - - ctx.launch { - val inlet = processContext.listen(input.receive) - - while (isActive) { - when (val msg = inlet.receive()) { - is ServerEvent.StateChanged -> onUpdate(msg.server, msg.previousState) - } - } - } - - return object : ServerMonitor { - private var outlet: SendPort<ServerEvent>? = null - - override suspend fun onUpdate(server: Server, previousState: ServerState) { - if (outlet == null) { - outlet = processContext.connect(input.send) - } - - outlet!!.send(ServerEvent.StateChanged(server, previousState)) - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt deleted file mode 100644 index a8996f61..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt +++ /dev/null @@ -1,146 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.metal.driver - -import com.atlarge.odcsim.ReceivePort -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.processContext -import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.PowerState -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch - -/** - * Messages that may be sent to the management interface of a bare-metal compute [Node], similar to the - * [BareMetalDriver] interface. - */ -public sealed class NodeRequest { - /** - * Initialize the compute node. - */ - public data class Initialize(public val monitor: ServerMonitor) : NodeRequest() - - /** - * Update the power state of the compute node. - */ - public data class SetPowerState(public val state: PowerState) : NodeRequest() - - /** - * Update the boot disk image of the compute node. - */ - public data class SetImage(public val image: Image) : NodeRequest() - - /** - * Obtain the state of the compute node. - */ - public object Refresh : NodeRequest() -} - -/** - * Responses emitted by a bare-metal compute [Node]. - */ -public sealed class NodeResponse { - /** - * The node that sent this response. - */ - public abstract val node: Node - - /** - * A response sent when the bare metal driver has been initialized. - */ - public data class Initialized(public override val node: Node) : NodeResponse() - - /** - * A response sent to indicate the power state of the node changed. - */ - public data class PowerStateChanged(public override val node: Node) : NodeResponse() - - /** - * A response sent to indicate the image of a node was changed. - */ - public data class ImageChanged(public override val node: Node) : NodeResponse() - - /** - * A response sent for obtaining the refreshed [Node] instance. - */ - public data class Refreshed(public override val node: Node) : NodeResponse() -} - -/** - * Serialize the specified [BareMetalDriver] instance in order to safely send this object across logical processes. - */ -public suspend fun BareMetalDriver.serialize(): BareMetalDriver { - val ctx = processContext - val input = ctx.open<NodeRequest>() - val output = ctx.open<NodeResponse>() - - ctx.launch { - val outlet = processContext.connect(output.send) - val inlet = processContext.listen(input.receive) - - while (isActive) { - when (val msg = inlet.receive()) { - is NodeRequest.Initialize -> - outlet.send(NodeResponse.Initialized(init(msg.monitor))) - is NodeRequest.SetPowerState -> - outlet.send(NodeResponse.PowerStateChanged(setPower(msg.state))) - is NodeRequest.SetImage -> - outlet.send(NodeResponse.ImageChanged(setImage(msg.image))) - is NodeRequest.Refresh -> - outlet.send(NodeResponse.Refreshed(refresh())) - } - } - } - - return object : BareMetalDriver { - private lateinit var inlet: ReceivePort<NodeResponse> - private lateinit var outlet: SendPort<NodeRequest> - - override suspend fun init(monitor: ServerMonitor): Node { - outlet = processContext.connect(input.send) - inlet = processContext.listen(output.receive) - - outlet.send(NodeRequest.Initialize(monitor)) - return (inlet.receive() as NodeResponse.Initialized).node - } - - override suspend fun setPower(powerState: PowerState): Node { - outlet.send(NodeRequest.SetPowerState(powerState)) - return (inlet.receive() as NodeResponse.PowerStateChanged).node - } - - override suspend fun setImage(image: Image): Node { - outlet.send(NodeRequest.SetImage(image)) - return (inlet.receive() as NodeResponse.ImageChanged).node - } - - override suspend fun refresh(): Node { - outlet.send(NodeRequest.Refresh) - return (inlet.receive() as NodeResponse.Refreshed).node - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index b6d74cde..1adc8652 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.compute.metal.driver -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor @@ -39,11 +39,14 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.PowerState import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil import kotlin.math.max import kotlin.math.min +import kotlinx.coroutines.withContext /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -51,12 +54,15 @@ import kotlin.math.min * @param uid The unique identifier of the machine. * @param name An optional name of the machine. * @param cpuNodes The CPU nodes/packages available to the bare metal machine. + * @param memoryUnits The memory units in this machine. + * @param domain The simulation domain the driver runs in. */ public class SimpleBareMetalDriver( uid: UUID, name: String, val cpuNodes: List<ProcessingUnit>, - val memoryUnits: List<MemoryUnit> + val memoryUnits: List<MemoryUnit>, + private val domain: Domain ) : BareMetalDriver { /** * The monitor to use. @@ -73,12 +79,17 @@ public class SimpleBareMetalDriver( */ private val flavor = Flavor(cpuNodes.sumBy { it.cores }, memoryUnits.map { it.size }.sum()) - override suspend fun init(monitor: ServerMonitor): Node { - this.monitor = monitor - return node + /** + * The job that is running the image. + */ + private var job: Job? = null + + override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { + this@SimpleBareMetalDriver.monitor = monitor + return@withContext node } - override suspend fun setPower(powerState: PowerState): Node { + override suspend fun setPower(powerState: PowerState): Node = withContext(domain.coroutineContext) { val previousPowerState = node.powerState val server = when (node.powerState to powerState) { PowerState.POWER_OFF to PowerState.POWER_OFF -> null @@ -100,36 +111,36 @@ public class SimpleBareMetalDriver( launch() } - return node + return@withContext node } - override suspend fun setImage(image: Image): Node { + override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { node = node.copy(image = image) - return node + return@withContext node } - override suspend fun refresh(): Node = node + override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } /** * Launch the server image on the machine. */ private suspend fun launch() { - val serverCtx = this.serverCtx + val serverContext = serverCtx - processContext.spawn { - serverCtx.init() + job = domain.launch { + serverContext.init() try { - node.server!!.image(serverCtx) - serverCtx.exit() + node.server!!.image(serverContext) + serverContext.exit() } catch (cause: Throwable) { - serverCtx.exit(cause) + serverContext.exit(cause) } } } private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext { override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - val start = processContext.clock.millis() + val start = simulationContext.clock.millis() val usage = min(maxUsage, info.clockRate) * 1_000_000 // Usage from MHz to Hz try { @@ -141,7 +152,7 @@ public class SimpleBareMetalDriver( } catch (_: CancellationException) { // On cancellation, we compute and return the remaining burst } - val end = processContext.clock.millis() + val end = simulationContext.clock.millis() val granted = ceil((end - start) / 1000.0 * usage).toLong() return max(0, burst - granted) } @@ -149,7 +160,6 @@ public class SimpleBareMetalDriver( private val serverCtx = object : ServerManagementContext { private var initialized: Boolean = false - private lateinit var ctx: ProcessContext override val cpus: List<ProcessorContextImpl> = cpuNodes .asSequence() @@ -172,7 +182,6 @@ public class SimpleBareMetalDriver( val previousState = server.state server = server.copy(state = ServerState.ACTIVE) monitor.onUpdate(server, previousState) - ctx = processContext initialized = true } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index 6b5c0979..b18a4006 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.compute.metal.service +import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image @@ -31,11 +32,12 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.PowerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import kotlinx.coroutines.withContext /** * A very basic implementation of the [ProvisioningService]. */ -public class SimpleProvisioningService : ProvisioningService, ServerMonitor { +public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, ServerMonitor { /** * The active nodes in this service. */ @@ -46,29 +48,31 @@ public class SimpleProvisioningService : ProvisioningService, ServerMonitor { */ private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf() - override suspend fun create(driver: BareMetalDriver): Node { - val node = driver.init(this) + override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) { + val node = driver.init(this@SimpleProvisioningService) nodes[node] = driver - return node + return@withContext node } - override suspend fun nodes(): Set<Node> = nodes.keys + override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys } - override suspend fun refresh(node: Node): Node { - return nodes[node]!!.refresh() + override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) { + return@withContext nodes[node]!!.refresh() } - override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node { + override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { val driver = nodes[node]!! driver.setImage(image) driver.setPower(PowerState.POWER_OFF) val newNode = driver.setPower(PowerState.POWER_ON) monitors[newNode.server!!] = monitor - return newNode + return@withContext newNode } override suspend fun onUpdate(server: Server, previousState: ServerState) { - monitors[server]?.onUpdate(server, previousState) + withContext(domain.coroutineContext) { + monitors[server]?.onUpdate(server, previousState) + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index c0d5fe0f..9745b56c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerState @@ -65,7 +65,7 @@ class HypervisorVirtDriver( val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) memoryAvailable -= requiredMemory - vms.add(VmServerContext(server, monitor, processContext)) + vms.add(VmServerContext(server, monitor, simulationContext)) return server } @@ -76,11 +76,11 @@ class HypervisorVirtDriver( internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, - ctx: ProcessContext + ctx: SimulationContext ) : ServerManagementContext { private var initialized: Boolean = false - internal val job: Job = ctx.launch { + internal val job: Job = ctx.domain.launch { init() try { server.image(this@VmServerContext) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt index 0b172c61..4cc5ac9e 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.execution.ProcessorContext @@ -91,7 +91,7 @@ public class VmSchedulerImpl( flush() val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment - val call = processContext.launch { + val call = simulationContext.domain.launch { var duration: Long = Long.MAX_VALUE var deadline: Long = Long.MAX_VALUE @@ -121,7 +121,7 @@ public class VmSchedulerImpl( // We run the total burst on the host processor. Note that this call may be cancelled at any moment in // time, so not all of the burst may be executed. val remainder = run(burst, usage, deadline) - val time = processContext.clock.millis() + val time = simulationContext.clock.millis() val totalGrantedBurst: Long = burst - remainder // Compute for each vCPU the diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index ef1528d9..888364e2 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,6 +1,6 @@ package com.atlarge.opendc.compute.virt.service -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState @@ -15,7 +15,7 @@ import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.launch class SimpleVirtProvisioningService( - private val ctx: ProcessContext, + private val ctx: SimulationContext, private val provisioningService: ProvisioningService, private val hypervisorMonitor: HypervisorMonitor ) : VirtProvisioningService, ServerMonitor { @@ -50,7 +50,7 @@ class SimpleVirtProvisioningService( internal val imagesByServer: MutableMap<Server, MutableSet<ImageView>> = mutableMapOf() init { - ctx.launch { + ctx.domain.launch { val provisionedNodes = provisioningService.nodes().toList() val deployedNodes = provisionedNodes.map { node -> val hypervisorImage = @@ -72,7 +72,7 @@ class SimpleVirtProvisioningService( } private fun requestCycle() { - ctx.launch { + ctx.domain.launch { schedule() } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index dc4f8078..6b234b73 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,14 +25,17 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.PowerState -import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import java.util.ServiceLoader import java.util.UUID @@ -43,26 +46,35 @@ internal class SimpleBareMetalDriverTest { */ @Test fun smoke() { + var finalState: ServerState = ServerState.BUILD val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ _ -> - val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) + val system = provider("sim") + val root = system.newDomain(name = "root") + root.launch { + val dom = root.newDomain(name = "driver") + val flavor = Flavor(4, 0) + val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom) + val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { - println(server) + finalState = server.state } } - val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList()) + val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) - driver.init(monitor) - driver.setImage(image) - driver.setPower(PowerState.POWER_ON) - delay(5) - println(driver.refresh()) - }, name = "sim") + // Batch driver commands + withContext(dom.coroutineContext) { + driver.init(monitor) + driver.setImage(image) + driver.setPower(PowerState.POWER_ON) + } + } runBlocking { system.run() system.terminate() } + + assertEquals(finalState, ServerState.SHUTOFF) } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index 85e3383c..3b32b3b8 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import java.util.ServiceLoader @@ -47,21 +48,25 @@ internal class SimpleProvisioningServiceTest { @Test fun smoke() { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ _ -> + val system = provider("sim") + val root = system.newDomain(name = "root") + root.launch { val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { println(server) } } - val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList()) - val provisioner = SimpleProvisioningService() + val dom = root.newDomain("provisioner") + val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom) + + val provisioner = SimpleProvisioningService(dom) provisioner.create(driver) delay(5) val nodes = provisioner.nodes() provisioner.deploy(nodes.first(), image, monitor) - }, name = "sim") + } runBlocking { system.run() diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index f59f4830..002fa175 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -25,7 +25,7 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor @@ -37,6 +37,7 @@ import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import java.util.ServiceLoader @@ -52,7 +53,10 @@ internal class HypervisorTest { @Test fun smoke() { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ _ -> + val system = provider("test") + val root = system.newDomain("root") + + root.launch { val vmm = HypervisorImage(object : HypervisorMonitor { override fun onSliceFinish( time: Long, @@ -68,10 +72,12 @@ internal class HypervisorTest { val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000, 1) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("[${processContext.clock.millis()}]: $server") + println("[${simulationContext.clock.millis()}]: $server") } } - val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1)), emptyList()) + + val driverDom = root.newDomain("driver") + val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1)), emptyList(), driverDom) metalDriver.init(monitor) metalDriver.setImage(vmm) @@ -82,7 +88,7 @@ internal class HypervisorTest { val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver] vmDriver.spawn(workloadA, monitor, flavor) vmDriver.spawn(workloadB, monitor, flavor) - }, name = "sim") + } runBlocking { system.run() diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index 96796c07..d5e1404a 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.experiments.sc18 import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader @@ -39,12 +40,14 @@ import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task +import kotlin.math.max +import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File import java.util.ServiceLoader -import kotlin.math.max /** * Main entry point of the experiment. @@ -55,9 +58,6 @@ fun main(args: Array<String>) { return } - val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) - .use { it.read() } - var total = 0 var finished = 0 @@ -85,11 +85,16 @@ fun main(args: Array<String>) { } val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ ctx -> - println(ctx.clock.instant()) - val scheduler = StageWorkflowService( - ctx, - environment.platforms[0].zones[0].services[ProvisioningService.Key], + val system = provider(name = "sim") + + val schedulerDomain = system.newDomain(name = "scheduler") + val schedulerAsync = schedulerDomain.async { + val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) + .use { it.construct(system.newDomain("topology")) } + + StageWorkflowService( + schedulerDomain, + environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), @@ -98,8 +103,13 @@ fun main(args: Array<String>) { resourceFilterPolicy = FunctionalResourceFilterPolicy, resourceSelectionPolicy = FirstFitResourceSelectionPolicy ) + } + val broker = system.newDomain(name = "broker") + broker.launch { + val ctx = simulationContext val reader = GwfTraceReader(File(args[0])) + val scheduler = schedulerAsync.await() while (reader.hasNext()) { val (time, job) = reader.next() @@ -107,11 +117,7 @@ fun main(args: Array<String>) { delay(max(0, time * 1000 - ctx.clock.millis())) scheduler.submit(job, monitor) } - - token.receive() - - println(ctx.clock.instant()) - }, name = "sim") + } runBlocking { system.run() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 3882feb7..48aca303 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.experiments.sc20 import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState @@ -35,6 +36,7 @@ import com.atlarge.opendc.format.environment.sc20.Sc20EnvironmentReader import com.atlarge.opendc.format.trace.vm.VmTraceReader import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File import java.util.ServiceLoader @@ -48,10 +50,6 @@ fun main(args: Array<String>) { println("error: Please provide path to directory containing VM trace files") return } - - val environment = Sc20EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-small.json")) - .use { it.read() } - val token = Channel<Boolean>() val monitor = object : ServerMonitor { @@ -61,10 +59,17 @@ fun main(args: Array<String>) { } val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ ctx -> - println(ctx.clock.instant()) + val system = provider("test") + val root = system.newDomain("root") + + root.launch { + val environment = Sc20EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-small.json")) + .use { it.construct(root) } + + println(simulationContext.clock.instant()) + val scheduler = SimpleVirtProvisioningService( - ctx, + simulationContext, environment.platforms[0].zones[0].services[ProvisioningService.Key], Sc20HypervisorMonitor() ) @@ -73,14 +78,14 @@ fun main(args: Array<String>) { delay(1376314846 * 1000L) while (reader.hasNext()) { val (time, workload) = reader.next() - delay(max(0, time * 1000 - ctx.clock.millis())) + delay(max(0, time * 1000 - simulationContext.clock.millis())) scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory)) } token.receive() - println(ctx.clock.instant()) - }, name = "sim") + println(simulationContext.clock.instant()) + } runBlocking { system.run() diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt index 6ca53a05..42551f43 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.format.environment +import com.atlarge.odcsim.Domain import com.atlarge.opendc.core.Environment import java.io.Closeable @@ -32,7 +33,7 @@ import java.io.Closeable */ interface EnvironmentReader : Closeable { /** - * Read the description of the datacenter environment as [Environment]. + * Construct an [Environment] in the specified domain. */ - fun read(): Environment + suspend fun construct(dom: Domain): Environment } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index ac44337a..8898ddc7 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.format.environment.sc18 +import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver @@ -37,7 +38,6 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import kotlinx.coroutines.runBlocking import java.io.InputStream import java.util.UUID @@ -52,10 +52,11 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb /** * The environment that was read from the file. */ - private val environment: Environment + private val setup: Setup = mapper.readValue(input) + + override suspend fun construct(dom: Domain): Environment { + val provisioningDomain = dom.newDomain("provisioner") - init { - val setup = mapper.readValue<Setup>(input) var counter = 0 val nodes = setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> @@ -69,18 +70,17 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, listOf(MemoryUnit("", "", 2300.0, 16000))) + SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, listOf(MemoryUnit("", "", 2300.0, 16000)), + dom.newDomain("node-$counter")) } } } } } - val provisioningService = SimpleProvisioningService() - runBlocking { - for (node in nodes) { - provisioningService.create(node) - } + val provisioningService = SimpleProvisioningService(provisioningDomain) + for (node in nodes) { + provisioningService.create(node) } val serviceRegistry = ServiceRegistryImpl() @@ -92,10 +92,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb ) ) - environment = Environment(setup.name, null, listOf(platform)) + return Environment(setup.name, null, listOf(platform)) } - override fun read(): Environment = environment - override fun close() {} } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 5eb711cc..fecba302 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.format.environment.sc20 +import com.atlarge.odcsim.Domain import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver @@ -37,7 +38,6 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import kotlinx.coroutines.runBlocking import java.io.InputStream import java.util.UUID @@ -51,10 +51,9 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb /** * The environment that was read from the file. */ - private val environment: Environment + private val setup: Setup = mapper.readValue(input) - init { - val setup = mapper.readValue<Setup>(input) + override suspend fun construct(dom: Domain): Environment { var counter = 0 val nodes = setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> @@ -74,18 +73,16 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, memories) + SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, memories, dom.newDomain("node-$counter")) } } } } } - val provisioningService = SimpleProvisioningService() - runBlocking { - for (node in nodes) { - provisioningService.create(node) - } + val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) + for (node in nodes) { + provisioningService.create(node) } val serviceRegistry = ServiceRegistryImpl() @@ -97,10 +94,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb ) ) - environment = Environment(setup.name, null, listOf(platform)) + return Environment(setup.name, null, listOf(platform)) } - override fun read(): Environment = environment - override fun close() {} } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 48f06bcd..008cd1ee 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -24,7 +24,8 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.monitor.ServerMonitor @@ -41,13 +42,14 @@ import com.atlarge.opendc.workflows.workload.Job import java.util.PriorityQueue import java.util.Queue import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for * Datacenter Scheduling. */ class StageWorkflowService( - private val ctx: ProcessContext, + private val domain: Domain, private val provisioningService: ProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -167,7 +169,7 @@ class StageWorkflowService( private val resourceSelectionPolicy: Comparator<Node> init { - ctx.launch { + domain.launch { nodes = provisioningService.nodes().toList() available.addAll(nodes) } @@ -181,9 +183,9 @@ class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override suspend fun submit(job: Job, monitor: WorkflowMonitor) { + override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) { // J1 Incoming Jobs - val jobInstance = JobState(job, monitor, ctx.clock.millis()) + val jobInstance = JobState(job, monitor, simulationContext.clock.millis()) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -230,7 +232,7 @@ class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis()) + jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis()) rootListener.jobStarted(jobInstance) } @@ -292,23 +294,23 @@ class StageWorkflowService( } } - override suspend fun onUpdate(server: Server, previousState: ServerState) { + override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) { when (server.state) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) - task.startedAt = ctx.clock.millis() - task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis()) + task.startedAt = simulationContext.clock.millis() + task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis()) rootListener.taskStarted(task) } ServerState.SHUTOFF, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() val job = task.job task.state = TaskStatus.FINISHED - task.finishedAt = ctx.clock.millis() + task.finishedAt = simulationContext.clock.millis() job.tasks.remove(task) available += task.host!! activeTasks -= task - job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis()) + job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis()) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -333,7 +335,7 @@ class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - job.monitor.onJobFinish(job.job, ctx.clock.millis()) + job.monitor.onJobFinish(job.job, simulationContext.clock.millis()) rootListener.jobFinished(job) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index cfec93b5..776f0b07 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.processContext +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.workflows.service.stage.StagePolicy import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -66,13 +66,13 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = processContext + val ctx = simulationContext if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. val delay = quantum - (ctx.clock.millis() % quantum) - val job = ctx.launch { + val job = ctx.domain.launch { delay(delay) next = null scheduler.schedule() @@ -93,11 +93,11 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { override suspend fun requestCycle() { - val ctx = processContext + val ctx = simulationContext if (next == null) { val delay = random.nextInt(200).toLong() - val job = ctx.launch { + val job = ctx.domain.launch { delay(delay) next = null scheduler.schedule() |
