summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt30
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt70
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt52
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Domain.kt (renamed from odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessRef.kt)27
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt71
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt (renamed from odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessContext.kt)44
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt10
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt7
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt236
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt10
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt4
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt80
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt146
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt51
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt24
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt10
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt6
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt8
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt34
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt13
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt16
-rw-r--r--opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt34
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt25
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt5
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt24
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt21
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt26
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt10
31 files changed, 285 insertions, 827 deletions
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
deleted file mode 100644
index 1116c447..00000000
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim
-
-/**
- * The behavior of a logical process defines how the process operates within its environments, represented as coroutine.
- */
-public typealias Behavior = suspend (ctx: ProcessContext) -> Unit
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt
deleted file mode 100644
index 10b4908a..00000000
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim
-
-import java.io.Serializable
-
-/**
- * A unidirectional communication medium using message passing. Processes may send messages over a channel, and
- * another process is able to receive messages sent over a channel it has a reference to (in form of a [ReceivePort]).
- *
- * Channels are represented by their send and receive endpoints at which processes may respectively send and receive
- * messages from this channel.
- *
- * Channels and their respective send and receive references may be shared freely between logical processes in the same
- * simulation.
- */
-public interface Channel<T : Any> : Serializable {
- /**
- * The endpoint of the channel processes may use to send messages to.
- */
- public val send: SendRef<T>
-
- /**
- * The endpoint of the channel processes may receive messages from.
- */
- public val receive: ReceiveRef<T>
-
- /**
- * Obtain the send endpoint of the channel when unpacking the channel. See [send].
- */
- public operator fun component1(): SendRef<T> = send
-
- /**
- * Obtain the receive endpoint of the channel when unpacking the channel. See [receive].
- */
- public operator fun component2(): ReceiveRef<T> = receive
-}
-
-/**
- * An opaque object representing a [Channel] endpoint through which logical processes can send messages over the
- * channel.
- */
-public interface SendRef<in T : Any> : Serializable
-
-/**
- * An opaque object representing the receive endpoint of a [Channel].
- */
-public interface ReceiveRef<out T : Any> : Serializable
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt
deleted file mode 100644
index b15ce3ae..00000000
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim
-
-suspend fun <T : Any, U : Any> SendRef<T>.ask(block: (SendRef<U>) -> T): U {
- val ctx = processContext
- val outlet = ctx.connect(this)
- val channel = ctx.open<U>()
- try {
- outlet.send(block(channel.send))
- } finally {
- outlet.close()
- }
-
- val inlet = ctx.listen(channel.receive)
- try {
- return inlet.receive()
- } finally {
- inlet.close()
- }
-}
-
-suspend fun <T : Any> SendRef<T>.sendOnce(msg: T) {
- val outlet = processContext.connect(this)
- try {
- outlet.send(msg)
- } finally {
- outlet.close()
- }
-}
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessRef.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Domain.kt
index 833458e4..c850952c 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessRef.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Domain.kt
@@ -24,22 +24,31 @@
package com.atlarge.odcsim
-import java.io.Serializable
+import kotlinx.coroutines.CoroutineScope
/**
- * A reference to a logical process in simulation.
+ * An isolated execution unit that runs concurrently in simulation to the other simulation domains. A domain defines a
+ * logical boundary between processes in simulation.
*/
-public interface ProcessRef : Comparable<ProcessRef>, Serializable {
+public interface Domain : CoroutineScope {
/**
- * The name of the process.
+ * The name of this domain.
*/
public val name: String
/**
- * Compare [other] process ref with this process reference for order.
- *
- * @return a negative integer, zero, or a positive integer as this object is less than, equal to, or greater
- * than the specified path.
+ * The parent domain to which the lifecycle of this domain is bound. In case this is a root domain, this refers to
+ * itself.
*/
- override fun compareTo(other: ProcessRef): Int = name.compareTo(other.name)
+ public val parent: Domain
+
+ /**
+ * Construct an anonymous simulation sub-domain that is bound to the lifecycle of this domain.
+ */
+ public fun newDomain(): Domain
+
+ /**
+ * Construct a new simulation sub-domain with the specified [name].
+ */
+ public fun newDomain(name: String): Domain
}
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt
deleted file mode 100644
index 7c730866..00000000
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim
-
-import kotlinx.coroutines.selects.SelectClause1
-
-/**
- * A communication endpoint of a specific logical process through which messages pass.
- *
- * Ports are tied to a specific logical process and may not be shared with other processes. Doing so results in
- * undefined behavior and might cause violation of time-consistency between processes.
- */
-public interface Port {
- /**
- * Close this communication endpoint. This informs the process(es) at the other end of the port that the caller
- * will not send or receive messages via this port.
- *
- * This is an idempotent operation – subsequent invocations of this function have no effect and return `false`.
- */
- fun close(): Boolean
-}
-
-/**
- * A [Port] through which a logical process may receive messages from other [SendPort]s.
- */
-public interface ReceivePort<out T : Any> : Port {
- /**
- * Receive a message send to this port or suspend the caller while no messages have been received at this port yet.
- */
- public suspend fun receive(): T
-
- /**
- * Clause for select expression for receiving a message from the channel.
- */
- val onReceive: SelectClause1<T>
-}
-
-/**
- * A [Port] through which logical processes may send messages to a [ReceivePort].
- */
-public interface SendPort<in T : Any> : Port {
- /**
- * Send a message via this port to the process(es) listening at the other end of the port.
- *
- * Messages are send asynchronously to the receivers and do not suspend the caller. This method guarantees
- * exactly-once delivery while respecting time-consistency between owner of the send port and its receivers.
- */
- public fun send(message: T)
-}
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessContext.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt
index 30ef4114..c51d1d8b 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessContext.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt
@@ -27,22 +27,21 @@ package com.atlarge.odcsim
import java.time.Clock
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
-import kotlinx.coroutines.CoroutineScope
import org.slf4j.Logger
/**
- * Represents the execution context of a logical process in simulation.
+ * Represents the execution context of a simulation domain.
*/
-public interface ProcessContext : CoroutineContext.Element, CoroutineScope {
+public interface SimulationContext : CoroutineContext.Element {
/**
- * Key for [ProcessContext] instance in the coroutine context.
+ * Key for [SimulationContext] instance in the coroutine context.
*/
- companion object Key : CoroutineContext.Key<ProcessContext>
+ companion object Key : CoroutineContext.Key<SimulationContext>
/**
- * The reference to the logical process of this context.
+ * The reference to the current simulation domain.
*/
- public val self: ProcessRef
+ public val domain: Domain
/**
* The clock tracking the simulation time.
@@ -53,37 +52,12 @@ public interface ProcessContext : CoroutineContext.Element, CoroutineScope {
* A logger instance tied to the logical process.
*/
public val log: Logger
-
- /**
- * Spawn an anonymous logical process in the simulation universe with the specified [behavior].
- */
- public fun spawn(behavior: Behavior): ProcessRef
-
- /**
- * Spawn a logical process in the simulation universe with the specified [behavior] and [name].
- */
- public fun spawn(behavior: Behavior, name: String): ProcessRef
-
- /**
- * Open a new communication [Channel] for messages of type [T].
- */
- public fun <T : Any> open(): Channel<T>
-
- /**
- * Create a [SendPort] for sending messages to the specified [send].
- */
- public suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T>
-
- /**
- * Create a [ReceivePort] for listening to the messages sent to the specified [receive] endpoint of a channel.
- */
- public suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T>
}
/**
- * The process context of the current coroutine.
+ * The simulation context of the current coroutine.
*/
@Suppress("WRONG_MODIFIER_TARGET")
-public suspend inline val processContext: ProcessContext
+public suspend inline val simulationContext: SimulationContext
@Suppress("ILLEGAL_SUSPEND_PROPERTY_ACCESS")
- get() = coroutineContext[ProcessContext.Key] ?: throw IllegalStateException("No process context active")
+ get() = coroutineContext[SimulationContext] ?: throw IllegalStateException("No simulation context available")
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt
index e35acbf0..db05cb1d 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt
@@ -37,6 +37,16 @@ public interface SimulationEngine {
public val name: String
/**
+ * Construct an anonymous root simulation domain.
+ */
+ public fun newDomain(): Domain
+
+ /**
+ * Construct a new root simulation domain with the specified [name].
+ */
+ public fun newDomain(name: String): Domain
+
+ /**
* Run the simulation.
*/
public suspend fun run()
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt
index 93dda963..a975fa3c 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt
@@ -29,10 +29,7 @@ package com.atlarge.odcsim
*/
public interface SimulationEngineProvider {
/**
- * Create an [SimulationEngine] with the given root [Behavior] and the given name.
- *
- * @param root The behavior of the root process.
- * @param name The name of the engine instance.
+ * Construct an [SimulationEngine] with the given [name].
*/
- public operator fun invoke(root: Behavior, name: String): SimulationEngine
+ public operator fun invoke(name: String): SimulationEngine
}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index 207d2768..4edf94d2 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -24,14 +24,8 @@
package com.atlarge.odcsim.engine.omega
-import com.atlarge.odcsim.Behavior
-import com.atlarge.odcsim.Channel
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.ProcessRef
-import com.atlarge.odcsim.ReceivePort
-import com.atlarge.odcsim.ReceiveRef
-import com.atlarge.odcsim.SendPort
-import com.atlarge.odcsim.SendRef
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.SimulationEngine
import com.atlarge.odcsim.engine.omega.logging.LoggerImpl
import java.time.Clock
@@ -39,21 +33,19 @@ import java.time.Instant
import java.time.ZoneId
import java.util.PriorityQueue
import java.util.UUID
-import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
-import kotlin.coroutines.startCoroutine
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.Job
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.SupervisorJob
-import kotlinx.coroutines.channels.Channel as KChannel
import kotlinx.coroutines.isActive
-import kotlinx.coroutines.selects.SelectClause1
import org.jetbrains.annotations.Async
import org.slf4j.Logger
@@ -63,10 +55,9 @@ import org.slf4j.Logger
* This engine implementation is a single-threaded implementation, running logical processes synchronously and
* provides a single priority queue for all events (messages, ticks, etc) that occur.
*
- * @param rootBehavior The behavior of the root actor.
* @param name The name of the engine instance.
*/
-class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : SimulationEngine {
+public class OmegaSimulationEngine(override val name: String) : SimulationEngine {
/**
* The state of the actor system.
*/
@@ -85,12 +76,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
/**
* The active processes in the simulation engine.
*/
- private val registry: MutableMap<ProcessRef, ProcessImpl> = HashMap()
-
- /**
- * The channels that have been registered by this engine.
- */
- private val channels: MutableSet<ChannelImpl<*>> = HashSet()
+ private val registry: MutableMap<String, Domain> = HashMap()
/**
* A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence
@@ -99,29 +85,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
private var nextId: Long = 0
- /**
- * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior.
- */
- @InternalCoroutinesApi
- private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
- override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Event.Dispatch(clock.time, nextId++, block))
- }
+ override fun newDomain(): Domain = newDomain(null)
- override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
- }
-
- override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
- val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
- schedule(event)
- return event
- }
- }
-
- init {
- spawn(rootBehavior, "/")
- }
+ override fun newDomain(name: String): Domain = newDomain(name, null)
override suspend fun run() {
check(state != SimulationEngineState.TERMINATED) { "The simulation engine is terminated" }
@@ -164,80 +130,71 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
/**
- * Spawn a new logical process in the simulation universe.
+ * Spawn a new simulation domain.
*/
- private fun spawn(behavior: Behavior, name: String): ProcessRef {
- val ref = ProcessRefImpl(this, name)
- require(ref !in registry) { "Process name $name not unique" }
- val lp = ProcessImpl(ref, behavior)
- registry[ref] = lp
- lp.start()
- return ref
+ private fun newDomainImpl(name: String, parent: DomainImpl?): Domain {
+ val domain = DomainImpl(name, parent)
+ require(domain.path !in registry) { "Domain name $name not unique" }
+ registry[domain.path] = domain
+ return domain
}
- /**
- * Register a new communication channel.
- */
- private fun <T : Any> open(): Channel<T> {
- val channel = ChannelImpl<T>()
- channels += channel
- return channel
+ private fun newDomain(parent: DomainImpl?): Domain {
+ val name = "$" + UUID.randomUUID()
+ return newDomainImpl(name, null)
+ }
+
+ private fun newDomain(name: String, parent: DomainImpl?): Domain {
+ require(name.isNotEmpty()) { "Domain name may not be empty" }
+ require(!name.startsWith("$")) { "Domain name may not start with $-sign" }
+ require(!name.contains("/")) { "Domain name may not contain /" }
+ return newDomainImpl(name, parent)
}
- private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> {
- val job = SupervisorJob()
+ private inner class DomainImpl(override val name: String, parent: DomainImpl?) : SimulationContext, Domain {
+ val job: Job = SupervisorJob(parent?.job)
+ val path: String = (parent?.path ?: "") + "/$name"
- override val clock: Clock
- get() = this@OmegaSimulationEngine.clock
+ @InternalCoroutinesApi
+ private val dispatcher = object : CoroutineDispatcher(), Delay {
+ // CoroutineDispatcher
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ schedule(Event.Dispatch(clock.time, nextId++, block))
+ }
- override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) }
+ // Delay
+ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
+ schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
+ }
- override fun spawn(behavior: Behavior): ProcessRef {
- val name = "$" + UUID.randomUUID()
- return this@OmegaSimulationEngine.spawn(behavior, name)
+ override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
+ val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
+ schedule(event)
+ return event
+ }
}
- override fun spawn(behavior: Behavior, name: String): ProcessRef {
- require(name.isNotEmpty()) { "Process name may not be empty" }
- require(!name.startsWith("$")) { "Process name may not start with $-sign" }
- return this@OmegaSimulationEngine.spawn(behavior, name)
- }
+ // SimulationContext
+ override val key: CoroutineContext.Key<*> = SimulationContext.Key
- override fun <T : Any> open(): Channel<T> = this@OmegaSimulationEngine.open()
+ override val domain: Domain = this
- override suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T> {
- require(send is ChannelImpl && send in channels) { "Invalid reference to channel" }
- return SendPortImpl(send)
- }
+ override val clock: VirtualClock
+ get() = this@OmegaSimulationEngine.clock
- override suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T> {
- require(receive is ChannelImpl && receive in channels) { "Invalid reference to channel" }
- return ReceivePortImpl(receive)
- }
+ override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) }
- /**
- * Start this logical process.
- */
- fun start() = behavior.startCoroutine(this, this)
-
- override fun resumeWith(result: Result<Unit>) {
- // Stop the logical process
- if (result.isFailure) {
- result.exceptionOrNull()!!.printStackTrace()
- job.completeExceptionally(result.exceptionOrNull()!!)
- } else {
- job.complete()
- }
- }
+ override fun newDomain(): Domain = this@OmegaSimulationEngine.newDomain(this)
- override val key: CoroutineContext.Key<*> = ProcessContext.Key
+ override fun newDomain(name: String): Domain = this@OmegaSimulationEngine.newDomain(name, this)
- @InternalCoroutinesApi
- override val coroutineContext: CoroutineContext
- get() = context
+ // Domain
+ override val parent: Domain = parent ?: this
@InternalCoroutinesApi
- override val context: CoroutineContext = this + dispatcher + job
+ override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job
+
+ override fun toString(): String = path
}
/**
@@ -248,81 +205,6 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
/**
- * Internal [ProcessRef] implementation for this simulation engine.
- */
- private data class ProcessRefImpl(
- private val owner: OmegaSimulationEngine,
- override val name: String
- ) : ProcessRef {
- override fun toString(): String = "Process[$name]"
- }
-
- /**
- * Internal [Channel] implementation.
- */
- private inner class ChannelImpl<T : Any> : Channel<T>, SendRef<T>, ReceiveRef<T> {
- override val send: SendRef<T> = this
- override val receive: ReceiveRef<T> = this
-
- /**
- * The underlying `kotlinx.coroutines` channel to back this channel implementation.
- */
- private val channel = KChannel<T>(KChannel.UNLIMITED)
-
- val onReceive: SelectClause1<T>
- get() = channel.onReceive
-
- /**
- * Receive a message from this channel.
- */
- suspend fun receive(): T = channel.receive()
-
- /**
- * Send a message to this channel.
- */
- fun send(message: T) = assert(channel.offer(message)) { "Failed to send message" }
- }
-
- private inner class SendPortImpl<T : Any>(private val channelImpl: ChannelImpl<T>) : SendPort<T> {
- private var closed = false
-
- override fun close(): Boolean {
- if (closed) {
- return false
- }
-
- closed = true
- return true
- }
-
- override fun send(message: T) {
- check(!closed) { "Port is closed" }
- schedule(Event.Send(clock.time, nextId++, channelImpl, message))
- }
- }
-
- private class ReceivePortImpl<T : Any>(private val channel: ChannelImpl<T>) : ReceivePort<T> {
- private var closed = false
-
- override fun close(): Boolean {
- if (closed) {
- return false
- }
-
- closed = true
- return true
- }
-
- override val onReceive: SelectClause1<T>
- get() = channel.onReceive
-
- override suspend fun receive(): T {
- check(!closed) { "Port is closed" }
- return channel.receive()
- }
- }
-
- /**
* A wrapper around a message that has been scheduled for processing.
*
* @property time The point in time to deliver the message.
@@ -361,19 +243,13 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun toString(): String = "Timeout[$time]"
}
-
- class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) {
- override fun run() {
- channel.send(message)
- }
- }
}
/**
* A virtual [Clock] implementation for keeping track of simulation time.
*/
private data class VirtualClock(var time: Long) : Clock() {
- override fun withZone(zone: ZoneId?): Clock = TODO("not implemented")
+ override fun withZone(zone: ZoneId?): Clock = throw NotImplementedError()
override fun getZone(): ZoneId = ZoneId.systemDefault()
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
index 75bb2265..5dba3233 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
@@ -24,7 +24,6 @@
package com.atlarge.odcsim.engine.omega
-import com.atlarge.odcsim.Behavior
import com.atlarge.odcsim.SimulationEngine
import com.atlarge.odcsim.SimulationEngineProvider
import java.util.ServiceLoader
@@ -33,7 +32,6 @@ import java.util.ServiceLoader
* An [SimulationEngineProvider] for the Omega engine, used by the [ServiceLoader] API to create
* [OmegaSimulationEngine] instances.
*/
-class OmegaSimulationEngineProvider : SimulationEngineProvider {
- override operator fun invoke(root: Behavior, name: String): SimulationEngine =
- OmegaSimulationEngine(root, name)
+public class OmegaSimulationEngineProvider : SimulationEngineProvider {
+ override operator fun invoke(name: String): SimulationEngine = OmegaSimulationEngine(name)
}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
index 5db989e8..fca4826e 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
@@ -24,7 +24,7 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.Marker
import org.slf4j.helpers.MessageFormatter
@@ -33,11 +33,11 @@ import org.slf4j.spi.LocationAwareLogger
/**
* An actor-specific [Logger] implementation that is aware of the calling location.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
* @param delegate The [LocationAwareLogger] to delegate the messages to.
*/
internal class LocationAwareLoggerImpl(
- ctx: ProcessContext,
+ ctx: SimulationContext,
private val delegate: LocationAwareLogger
) : LoggerImpl(ctx), Logger by delegate {
/**
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
index b77e85e7..856cecfa 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
@@ -24,18 +24,18 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.Marker
/**
* A [Logger] implementation that is not aware of the calling location.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
* @param delegate The [Logger] to delegate the messages to.
*/
internal class LocationIgnorantLoggerImpl(
- ctx: ProcessContext,
+ ctx: SimulationContext,
private val delegate: Logger
) : LoggerImpl(ctx), Logger by delegate {
override fun warn(marker: Marker?, format: String?, arg1: Any?, arg2: Any?) {
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
index 5a52473b..1adcfdc0 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
@@ -24,7 +24,7 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
@@ -33,14 +33,14 @@ import org.slf4j.spi.LocationAwareLogger
/**
* An actor-specific [Logger] implementation.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
*/
-internal abstract class LoggerImpl internal constructor(protected val ctx: ProcessContext) : Logger {
+internal abstract class LoggerImpl internal constructor(protected val ctx: SimulationContext) : Logger {
/**
* Configure [MDC] with actor-specific information.
*/
protected inline fun withMdc(block: () -> Unit) {
- MDC.put(MDC_PROCESS_REF, ctx.self.name)
+ MDC.put(MDC_PROCESS_REF, ctx.domain.name)
MDC.put(MDC_PROCESS_TIME, String.format("%d", ctx.clock.millis()))
try {
block()
@@ -62,7 +62,7 @@ internal abstract class LoggerImpl internal constructor(protected val ctx: Proce
*
* @param ctx The actor context to create the logger for.
*/
- operator fun invoke(ctx: ProcessContext): Logger {
+ operator fun invoke(ctx: SimulationContext): Logger {
val logger = LoggerFactory.getLogger(ctx.javaClass)
return if (logger is LocationAwareLogger) {
LocationAwareLoggerImpl(ctx, logger)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
index 4e8162ec..436f4653 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt
@@ -1,6 +1,6 @@
package com.atlarge.opendc.compute.core.image
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
import kotlinx.coroutines.coroutineScope
@@ -28,7 +28,7 @@ class VmImage(
coroutineScope {
for (cpu in ctx.cpus.take(cores)) {
val usage = req / (fragment.usage * 1_000_000L)
- launch { cpu.run(req, usage, processContext.clock.millis() + fragment.duration) }
+ launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) }
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt
deleted file mode 100644
index fa9f627b..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/Protocol.kt
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.compute.core.monitor
-
-import com.atlarge.odcsim.SendPort
-import com.atlarge.odcsim.processContext
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
-import kotlinx.coroutines.isActive
-import kotlinx.coroutines.launch
-
-/**
- * Events emitted by a [Server] instance.
- */
-public sealed class ServerEvent {
- /**
- * The server that emitted this event.
- */
- public abstract val server: Server
-
- /**
- * A response sent when the bare metal driver has been initialized.
- */
- public data class StateChanged(
- public override val server: Server,
- public val previousState: ServerState
- ) : ServerEvent()
-}
-
-/**
- * Serialize the specified [ServerMonitor] instance in order to safely send this object across logical processes.
- */
-public suspend fun ServerMonitor.serialize(): ServerMonitor {
- val ctx = processContext
- val input = ctx.open<ServerEvent>()
-
- ctx.launch {
- val inlet = processContext.listen(input.receive)
-
- while (isActive) {
- when (val msg = inlet.receive()) {
- is ServerEvent.StateChanged -> onUpdate(msg.server, msg.previousState)
- }
- }
- }
-
- return object : ServerMonitor {
- private var outlet: SendPort<ServerEvent>? = null
-
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- if (outlet == null) {
- outlet = processContext.connect(input.send)
- }
-
- outlet!!.send(ServerEvent.StateChanged(server, previousState))
- }
- }
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt
deleted file mode 100644
index a8996f61..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/Protocol.kt
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.compute.metal.driver
-
-import com.atlarge.odcsim.ReceivePort
-import com.atlarge.odcsim.SendPort
-import com.atlarge.odcsim.processContext
-import com.atlarge.opendc.compute.core.image.Image
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
-import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.metal.PowerState
-import kotlinx.coroutines.isActive
-import kotlinx.coroutines.launch
-
-/**
- * Messages that may be sent to the management interface of a bare-metal compute [Node], similar to the
- * [BareMetalDriver] interface.
- */
-public sealed class NodeRequest {
- /**
- * Initialize the compute node.
- */
- public data class Initialize(public val monitor: ServerMonitor) : NodeRequest()
-
- /**
- * Update the power state of the compute node.
- */
- public data class SetPowerState(public val state: PowerState) : NodeRequest()
-
- /**
- * Update the boot disk image of the compute node.
- */
- public data class SetImage(public val image: Image) : NodeRequest()
-
- /**
- * Obtain the state of the compute node.
- */
- public object Refresh : NodeRequest()
-}
-
-/**
- * Responses emitted by a bare-metal compute [Node].
- */
-public sealed class NodeResponse {
- /**
- * The node that sent this response.
- */
- public abstract val node: Node
-
- /**
- * A response sent when the bare metal driver has been initialized.
- */
- public data class Initialized(public override val node: Node) : NodeResponse()
-
- /**
- * A response sent to indicate the power state of the node changed.
- */
- public data class PowerStateChanged(public override val node: Node) : NodeResponse()
-
- /**
- * A response sent to indicate the image of a node was changed.
- */
- public data class ImageChanged(public override val node: Node) : NodeResponse()
-
- /**
- * A response sent for obtaining the refreshed [Node] instance.
- */
- public data class Refreshed(public override val node: Node) : NodeResponse()
-}
-
-/**
- * Serialize the specified [BareMetalDriver] instance in order to safely send this object across logical processes.
- */
-public suspend fun BareMetalDriver.serialize(): BareMetalDriver {
- val ctx = processContext
- val input = ctx.open<NodeRequest>()
- val output = ctx.open<NodeResponse>()
-
- ctx.launch {
- val outlet = processContext.connect(output.send)
- val inlet = processContext.listen(input.receive)
-
- while (isActive) {
- when (val msg = inlet.receive()) {
- is NodeRequest.Initialize ->
- outlet.send(NodeResponse.Initialized(init(msg.monitor)))
- is NodeRequest.SetPowerState ->
- outlet.send(NodeResponse.PowerStateChanged(setPower(msg.state)))
- is NodeRequest.SetImage ->
- outlet.send(NodeResponse.ImageChanged(setImage(msg.image)))
- is NodeRequest.Refresh ->
- outlet.send(NodeResponse.Refreshed(refresh()))
- }
- }
- }
-
- return object : BareMetalDriver {
- private lateinit var inlet: ReceivePort<NodeResponse>
- private lateinit var outlet: SendPort<NodeRequest>
-
- override suspend fun init(monitor: ServerMonitor): Node {
- outlet = processContext.connect(input.send)
- inlet = processContext.listen(output.receive)
-
- outlet.send(NodeRequest.Initialize(monitor))
- return (inlet.receive() as NodeResponse.Initialized).node
- }
-
- override suspend fun setPower(powerState: PowerState): Node {
- outlet.send(NodeRequest.SetPowerState(powerState))
- return (inlet.receive() as NodeResponse.PowerStateChanged).node
- }
-
- override suspend fun setImage(image: Image): Node {
- outlet.send(NodeRequest.SetImage(image))
- return (inlet.receive() as NodeResponse.ImageChanged).node
- }
-
- override suspend fun refresh(): Node {
- outlet.send(NodeRequest.Refresh)
- return (inlet.receive() as NodeResponse.Refreshed).node
- }
- }
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index b6d74cde..1adc8652 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -24,8 +24,8 @@
package com.atlarge.opendc.compute.metal.driver
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
@@ -39,11 +39,14 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.PowerState
import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import java.util.UUID
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
+import kotlinx.coroutines.withContext
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
@@ -51,12 +54,15 @@ import kotlin.math.min
* @param uid The unique identifier of the machine.
* @param name An optional name of the machine.
* @param cpuNodes The CPU nodes/packages available to the bare metal machine.
+ * @param memoryUnits The memory units in this machine.
+ * @param domain The simulation domain the driver runs in.
*/
public class SimpleBareMetalDriver(
uid: UUID,
name: String,
val cpuNodes: List<ProcessingUnit>,
- val memoryUnits: List<MemoryUnit>
+ val memoryUnits: List<MemoryUnit>,
+ private val domain: Domain
) : BareMetalDriver {
/**
* The monitor to use.
@@ -73,12 +79,17 @@ public class SimpleBareMetalDriver(
*/
private val flavor = Flavor(cpuNodes.sumBy { it.cores }, memoryUnits.map { it.size }.sum())
- override suspend fun init(monitor: ServerMonitor): Node {
- this.monitor = monitor
- return node
+ /**
+ * The job that is running the image.
+ */
+ private var job: Job? = null
+
+ override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) {
+ this@SimpleBareMetalDriver.monitor = monitor
+ return@withContext node
}
- override suspend fun setPower(powerState: PowerState): Node {
+ override suspend fun setPower(powerState: PowerState): Node = withContext(domain.coroutineContext) {
val previousPowerState = node.powerState
val server = when (node.powerState to powerState) {
PowerState.POWER_OFF to PowerState.POWER_OFF -> null
@@ -100,36 +111,36 @@ public class SimpleBareMetalDriver(
launch()
}
- return node
+ return@withContext node
}
- override suspend fun setImage(image: Image): Node {
+ override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) {
node = node.copy(image = image)
- return node
+ return@withContext node
}
- override suspend fun refresh(): Node = node
+ override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node }
/**
* Launch the server image on the machine.
*/
private suspend fun launch() {
- val serverCtx = this.serverCtx
+ val serverContext = serverCtx
- processContext.spawn {
- serverCtx.init()
+ job = domain.launch {
+ serverContext.init()
try {
- node.server!!.image(serverCtx)
- serverCtx.exit()
+ node.server!!.image(serverContext)
+ serverContext.exit()
} catch (cause: Throwable) {
- serverCtx.exit(cause)
+ serverContext.exit(cause)
}
}
}
private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext {
override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long {
- val start = processContext.clock.millis()
+ val start = simulationContext.clock.millis()
val usage = min(maxUsage, info.clockRate) * 1_000_000 // Usage from MHz to Hz
try {
@@ -141,7 +152,7 @@ public class SimpleBareMetalDriver(
} catch (_: CancellationException) {
// On cancellation, we compute and return the remaining burst
}
- val end = processContext.clock.millis()
+ val end = simulationContext.clock.millis()
val granted = ceil((end - start) / 1000.0 * usage).toLong()
return max(0, burst - granted)
}
@@ -149,7 +160,6 @@ public class SimpleBareMetalDriver(
private val serverCtx = object : ServerManagementContext {
private var initialized: Boolean = false
- private lateinit var ctx: ProcessContext
override val cpus: List<ProcessorContextImpl> = cpuNodes
.asSequence()
@@ -172,7 +182,6 @@ public class SimpleBareMetalDriver(
val previousState = server.state
server = server.copy(state = ServerState.ACTIVE)
monitor.onUpdate(server, previousState)
- ctx = processContext
initialized = true
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
index 6b5c0979..b18a4006 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.compute.metal.service
+import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.Image
@@ -31,11 +32,12 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.PowerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
+import kotlinx.coroutines.withContext
/**
* A very basic implementation of the [ProvisioningService].
*/
-public class SimpleProvisioningService : ProvisioningService, ServerMonitor {
+public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, ServerMonitor {
/**
* The active nodes in this service.
*/
@@ -46,29 +48,31 @@ public class SimpleProvisioningService : ProvisioningService, ServerMonitor {
*/
private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf()
- override suspend fun create(driver: BareMetalDriver): Node {
- val node = driver.init(this)
+ override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) {
+ val node = driver.init(this@SimpleProvisioningService)
nodes[node] = driver
- return node
+ return@withContext node
}
- override suspend fun nodes(): Set<Node> = nodes.keys
+ override suspend fun nodes(): Set<Node> = withContext(domain.coroutineContext) { nodes.keys }
- override suspend fun refresh(node: Node): Node {
- return nodes[node]!!.refresh()
+ override suspend fun refresh(node: Node): Node = withContext(domain.coroutineContext) {
+ return@withContext nodes[node]!!.refresh()
}
- override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node {
+ override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) {
val driver = nodes[node]!!
driver.setImage(image)
driver.setPower(PowerState.POWER_OFF)
val newNode = driver.setPower(PowerState.POWER_ON)
monitors[newNode.server!!] = monitor
- return newNode
+ return@withContext newNode
}
override suspend fun onUpdate(server: Server, previousState: ServerState) {
- monitors[server]?.onUpdate(server, previousState)
+ withContext(domain.coroutineContext) {
+ monitors[server]?.onUpdate(server, previousState)
+ }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
index c0d5fe0f..9745b56c 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
@@ -24,8 +24,8 @@
package com.atlarge.opendc.compute.virt.driver.hypervisor
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.SimulationContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerState
@@ -65,7 +65,7 @@ class HypervisorVirtDriver(
val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD)
memoryAvailable -= requiredMemory
- vms.add(VmServerContext(server, monitor, processContext))
+ vms.add(VmServerContext(server, monitor, simulationContext))
return server
}
@@ -76,11 +76,11 @@ class HypervisorVirtDriver(
internal inner class VmServerContext(
override var server: Server,
val monitor: ServerMonitor,
- ctx: ProcessContext
+ ctx: SimulationContext
) : ServerManagementContext {
private var initialized: Boolean = false
- internal val job: Job = ctx.launch {
+ internal val job: Job = ctx.domain.launch {
init()
try {
server.image(this@VmServerContext)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt
index 0b172c61..4cc5ac9e 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.compute.virt.driver.hypervisor
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.execution.ProcessorContext
@@ -91,7 +91,7 @@ public class VmSchedulerImpl(
flush()
val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment
- val call = processContext.launch {
+ val call = simulationContext.domain.launch {
var duration: Long = Long.MAX_VALUE
var deadline: Long = Long.MAX_VALUE
@@ -121,7 +121,7 @@ public class VmSchedulerImpl(
// We run the total burst on the host processor. Note that this call may be cancelled at any moment in
// time, so not all of the burst may be executed.
val remainder = run(burst, usage, deadline)
- val time = processContext.clock.millis()
+ val time = simulationContext.clock.millis()
val totalGrantedBurst: Long = burst - remainder
// Compute for each vCPU the
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index ef1528d9..888364e2 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -1,6 +1,6 @@
package com.atlarge.opendc.compute.virt.service
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
@@ -15,7 +15,7 @@ import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import kotlinx.coroutines.launch
class SimpleVirtProvisioningService(
- private val ctx: ProcessContext,
+ private val ctx: SimulationContext,
private val provisioningService: ProvisioningService,
private val hypervisorMonitor: HypervisorMonitor
) : VirtProvisioningService, ServerMonitor {
@@ -50,7 +50,7 @@ class SimpleVirtProvisioningService(
internal val imagesByServer: MutableMap<Server, MutableSet<ImageView>> = mutableMapOf()
init {
- ctx.launch {
+ ctx.domain.launch {
val provisionedNodes = provisioningService.nodes().toList()
val deployedNodes = provisionedNodes.map { node ->
val hypervisorImage =
@@ -72,7 +72,7 @@ class SimpleVirtProvisioningService(
}
private fun requestCycle() {
- ctx.launch {
+ ctx.domain.launch {
schedule()
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index dc4f8078..6b234b73 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -25,14 +25,17 @@
package com.atlarge.opendc.compute.metal.driver
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.PowerState
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
+import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import java.util.ServiceLoader
import java.util.UUID
@@ -43,26 +46,35 @@ internal class SimpleBareMetalDriverTest {
*/
@Test
fun smoke() {
+ var finalState: ServerState = ServerState.BUILD
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ _ ->
- val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
+ val system = provider("sim")
+ val root = system.newDomain(name = "root")
+ root.launch {
+ val dom = root.newDomain(name = "driver")
+ val flavor = Flavor(4, 0)
+ val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom)
+
val monitor = object : ServerMonitor {
override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println(server)
+ finalState = server.state
}
}
- val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList())
+ val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
- driver.init(monitor)
- driver.setImage(image)
- driver.setPower(PowerState.POWER_ON)
- delay(5)
- println(driver.refresh())
- }, name = "sim")
+ // Batch driver commands
+ withContext(dom.coroutineContext) {
+ driver.init(monitor)
+ driver.setImage(image)
+ driver.setPower(PowerState.POWER_ON)
+ }
+ }
runBlocking {
system.run()
system.terminate()
}
+
+ assertEquals(finalState, ServerState.SHUTOFF)
}
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index 85e3383c..3b32b3b8 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import java.util.ServiceLoader
@@ -47,21 +48,25 @@ internal class SimpleProvisioningServiceTest {
@Test
fun smoke() {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ _ ->
+ val system = provider("sim")
+ val root = system.newDomain(name = "root")
+ root.launch {
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
val monitor = object : ServerMonitor {
override suspend fun onUpdate(server: Server, previousState: ServerState) {
println(server)
}
}
- val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList())
- val provisioner = SimpleProvisioningService()
+ val dom = root.newDomain("provisioner")
+ val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom)
+
+ val provisioner = SimpleProvisioningService(dom)
provisioner.create(driver)
delay(5)
val nodes = provisioner.nodes()
provisioner.deploy(nodes.first(), image, monitor)
- }, name = "sim")
+ }
runBlocking {
system.run()
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
index f59f4830..002fa175 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt
@@ -25,7 +25,7 @@
package com.atlarge.opendc.compute.virt.driver.hypervisor
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
@@ -37,6 +37,7 @@ import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import java.util.ServiceLoader
@@ -52,7 +53,10 @@ internal class HypervisorTest {
@Test
fun smoke() {
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ _ ->
+ val system = provider("test")
+ val root = system.newDomain("root")
+
+ root.launch {
val vmm = HypervisorImage(object : HypervisorMonitor {
override fun onSliceFinish(
time: Long,
@@ -68,10 +72,12 @@ internal class HypervisorTest {
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000, 1)
val monitor = object : ServerMonitor {
override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println("[${processContext.clock.millis()}]: $server")
+ println("[${simulationContext.clock.millis()}]: $server")
}
}
- val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1)), emptyList())
+
+ val driverDom = root.newDomain("driver")
+ val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1)), emptyList(), driverDom)
metalDriver.init(monitor)
metalDriver.setImage(vmm)
@@ -82,7 +88,7 @@ internal class HypervisorTest {
val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver]
vmDriver.spawn(workloadA, monitor, flavor)
vmDriver.spawn(workloadB, monitor, flavor)
- }, name = "sim")
+ }
runBlocking {
system.run()
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index 96796c07..d5e1404a 100644
--- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.experiments.sc18
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
@@ -39,12 +40,14 @@ import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
import com.atlarge.opendc.workflows.workload.Task
+import kotlin.math.max
+import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
import java.util.ServiceLoader
-import kotlin.math.max
/**
* Main entry point of the experiment.
@@ -55,9 +58,6 @@ fun main(args: Array<String>) {
return
}
- val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
- .use { it.read() }
-
var total = 0
var finished = 0
@@ -85,11 +85,16 @@ fun main(args: Array<String>) {
}
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ ctx ->
- println(ctx.clock.instant())
- val scheduler = StageWorkflowService(
- ctx,
- environment.platforms[0].zones[0].services[ProvisioningService.Key],
+ val system = provider(name = "sim")
+
+ val schedulerDomain = system.newDomain(name = "scheduler")
+ val schedulerAsync = schedulerDomain.async {
+ val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
+ .use { it.construct(system.newDomain("topology")) }
+
+ StageWorkflowService(
+ schedulerDomain,
+ environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
@@ -98,8 +103,13 @@ fun main(args: Array<String>) {
resourceFilterPolicy = FunctionalResourceFilterPolicy,
resourceSelectionPolicy = FirstFitResourceSelectionPolicy
)
+ }
+ val broker = system.newDomain(name = "broker")
+ broker.launch {
+ val ctx = simulationContext
val reader = GwfTraceReader(File(args[0]))
+ val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
val (time, job) = reader.next()
@@ -107,11 +117,7 @@ fun main(args: Array<String>) {
delay(max(0, time * 1000 - ctx.clock.millis()))
scheduler.submit(job, monitor)
}
-
- token.receive()
-
- println(ctx.clock.instant())
- }, name = "sim")
+ }
runBlocking {
system.run()
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index 3882feb7..48aca303 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.experiments.sc20
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
@@ -35,6 +36,7 @@ import com.atlarge.opendc.format.environment.sc20.Sc20EnvironmentReader
import com.atlarge.opendc.format.trace.vm.VmTraceReader
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
import java.util.ServiceLoader
@@ -48,10 +50,6 @@ fun main(args: Array<String>) {
println("error: Please provide path to directory containing VM trace files")
return
}
-
- val environment = Sc20EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-small.json"))
- .use { it.read() }
-
val token = Channel<Boolean>()
val monitor = object : ServerMonitor {
@@ -61,10 +59,17 @@ fun main(args: Array<String>) {
}
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ ctx ->
- println(ctx.clock.instant())
+ val system = provider("test")
+ val root = system.newDomain("root")
+
+ root.launch {
+ val environment = Sc20EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-small.json"))
+ .use { it.construct(root) }
+
+ println(simulationContext.clock.instant())
+
val scheduler = SimpleVirtProvisioningService(
- ctx,
+ simulationContext,
environment.platforms[0].zones[0].services[ProvisioningService.Key],
Sc20HypervisorMonitor()
)
@@ -73,14 +78,14 @@ fun main(args: Array<String>) {
delay(1376314846 * 1000L)
while (reader.hasNext()) {
val (time, workload) = reader.next()
- delay(max(0, time * 1000 - ctx.clock.millis()))
+ delay(max(0, time * 1000 - simulationContext.clock.millis()))
scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory))
}
token.receive()
- println(ctx.clock.instant())
- }, name = "sim")
+ println(simulationContext.clock.instant())
+ }
runBlocking {
system.run()
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
index 6ca53a05..42551f43 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.format.environment
+import com.atlarge.odcsim.Domain
import com.atlarge.opendc.core.Environment
import java.io.Closeable
@@ -32,7 +33,7 @@ import java.io.Closeable
*/
interface EnvironmentReader : Closeable {
/**
- * Read the description of the datacenter environment as [Environment].
+ * Construct an [Environment] in the specified domain.
*/
- fun read(): Environment
+ suspend fun construct(dom: Domain): Environment
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index ac44337a..8898ddc7 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.format.environment.sc18
+import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
@@ -37,7 +38,6 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import kotlinx.coroutines.runBlocking
import java.io.InputStream
import java.util.UUID
@@ -52,10 +52,11 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
/**
* The environment that was read from the file.
*/
- private val environment: Environment
+ private val setup: Setup = mapper.readValue(input)
+
+ override suspend fun construct(dom: Domain): Environment {
+ val provisioningDomain = dom.newDomain("provisioner")
- init {
- val setup = mapper.readValue<Setup>(input)
var counter = 0
val nodes = setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
@@ -69,18 +70,17 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
- SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, listOf(MemoryUnit("", "", 2300.0, 16000)))
+ SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, listOf(MemoryUnit("", "", 2300.0, 16000)),
+ dom.newDomain("node-$counter"))
}
}
}
}
}
- val provisioningService = SimpleProvisioningService()
- runBlocking {
- for (node in nodes) {
- provisioningService.create(node)
- }
+ val provisioningService = SimpleProvisioningService(provisioningDomain)
+ for (node in nodes) {
+ provisioningService.create(node)
}
val serviceRegistry = ServiceRegistryImpl()
@@ -92,10 +92,8 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
)
)
- environment = Environment(setup.name, null, listOf(platform))
+ return Environment(setup.name, null, listOf(platform))
}
- override fun read(): Environment = environment
-
override fun close() {}
}
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index 5eb711cc..fecba302 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.format.environment.sc20
+import com.atlarge.odcsim.Domain
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
@@ -37,7 +38,6 @@ import com.atlarge.opendc.format.environment.EnvironmentReader
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import kotlinx.coroutines.runBlocking
import java.io.InputStream
import java.util.UUID
@@ -51,10 +51,9 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
/**
* The environment that was read from the file.
*/
- private val environment: Environment
+ private val setup: Setup = mapper.readValue(input)
- init {
- val setup = mapper.readValue<Setup>(input)
+ override suspend fun construct(dom: Domain): Environment {
var counter = 0
val nodes = setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
@@ -74,18 +73,16 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
- SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, memories)
+ SimpleBareMetalDriver(UUID.randomUUID(), "node-${counter++}", cores, memories, dom.newDomain("node-$counter"))
}
}
}
}
}
- val provisioningService = SimpleProvisioningService()
- runBlocking {
- for (node in nodes) {
- provisioningService.create(node)
- }
+ val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
+ for (node in nodes) {
+ provisioningService.create(node)
}
val serviceRegistry = ServiceRegistryImpl()
@@ -97,10 +94,8 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
)
)
- environment = Environment(setup.name, null, listOf(platform))
+ return Environment(setup.name, null, listOf(platform))
}
- override fun read(): Environment = environment
-
override fun close() {}
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 48f06bcd..008cd1ee 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -24,7 +24,8 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
@@ -41,13 +42,14 @@ import com.atlarge.opendc.workflows.workload.Job
import java.util.PriorityQueue
import java.util.Queue
import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
* Datacenter Scheduling.
*/
class StageWorkflowService(
- private val ctx: ProcessContext,
+ private val domain: Domain,
private val provisioningService: ProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -167,7 +169,7 @@ class StageWorkflowService(
private val resourceSelectionPolicy: Comparator<Node>
init {
- ctx.launch {
+ domain.launch {
nodes = provisioningService.nodes().toList()
available.addAll(nodes)
}
@@ -181,9 +183,9 @@ class StageWorkflowService(
this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
- override suspend fun submit(job: Job, monitor: WorkflowMonitor) {
+ override suspend fun submit(job: Job, monitor: WorkflowMonitor) = withContext(domain.coroutineContext) {
// J1 Incoming Jobs
- val jobInstance = JobState(job, monitor, ctx.clock.millis())
+ val jobInstance = JobState(job, monitor, simulationContext.clock.millis())
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -230,7 +232,7 @@ class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis())
+ jobInstance.monitor.onJobStart(jobInstance.job, simulationContext.clock.millis())
rootListener.jobStarted(jobInstance)
}
@@ -292,23 +294,23 @@ class StageWorkflowService(
}
}
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
+ override suspend fun onUpdate(server: Server, previousState: ServerState) = withContext(domain.coroutineContext) {
when (server.state) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
- task.startedAt = ctx.clock.millis()
- task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis())
+ task.startedAt = simulationContext.clock.millis()
+ task.job.monitor.onTaskStart(task.job.job, task.task, simulationContext.clock.millis())
rootListener.taskStarted(task)
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
val job = task.job
task.state = TaskStatus.FINISHED
- task.finishedAt = ctx.clock.millis()
+ task.finishedAt = simulationContext.clock.millis()
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis())
+ job.monitor.onTaskFinish(job.job, task.task, 0, simulationContext.clock.millis())
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -333,7 +335,7 @@ class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- job.monitor.onJobFinish(job.job, ctx.clock.millis())
+ job.monitor.onJobFinish(job.job, simulationContext.clock.millis())
rootListener.jobFinished(job)
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
index cfec93b5..776f0b07 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.processContext
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.workflows.service.stage.StagePolicy
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -66,13 +66,13 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = processContext
+ val ctx = simulationContext
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
val delay = quantum - (ctx.clock.millis() % quantum)
- val job = ctx.launch {
+ val job = ctx.domain.launch {
delay(delay)
next = null
scheduler.schedule()
@@ -93,11 +93,11 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
override suspend fun requestCycle() {
- val ctx = processContext
+ val ctx = simulationContext
if (next == null) {
val delay = random.nextInt(200).toLong()
- val job = ctx.launch {
+ val job = ctx.domain.launch {
delay(delay)
next = null
scheduler.schedule()