diff options
Diffstat (limited to 'opendc-kernel-omega')
| -rw-r--r-- | opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 37 | ||||
| -rw-r--r-- | opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 5 |
2 files changed, 18 insertions, 24 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 8c1497c8..4d94bf9e 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -53,9 +53,9 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot /** * The message queue. */ - private val queue: Queue<MessageContainer> = PriorityQueue(Comparator - .comparingLong(MessageContainer::time) - .thenComparingLong(MessageContainer::id)) + private val queue: Queue<Envelope> = PriorityQueue(Comparator + .comparingLong(Envelope::time) + .thenComparingLong(Envelope::id)) /** * The kernel process instance that handles internal operations during the simulation. @@ -199,13 +199,12 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot * @property time The point in time to deliver the message. * @property sender The sender of the message. * @property destination The destination of the message. - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ - private data class MessageContainer(val id: Long, - override val message: Any, - val time: Instant, - override val sender: Entity<*, *>?, - override val destination: Entity<*, *>) : Envelope<Any> { + private data class Envelope(val id: Long, + val message: Any, + val time: Instant, + val sender: Entity<*, *>?, + val destination: Entity<*, *>) { /** * A flag to indicate the message has been canceled. */ @@ -217,7 +216,7 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot * * @param envelope The envelope containing the message to schedule. */ - private fun schedule(envelope: MessageContainer) { + private fun schedule(envelope: Envelope) { queue.add(envelope) } @@ -230,9 +229,9 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot * @param delay The time to delay the message. */ private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, - delay: Duration): MessageContainer { + delay: Duration): Envelope { require(delay >= 0) { "The amount of time to delay the message must be a positive number" } - return MessageContainer(nextId++, message, time + delay, sender, destination) + return Envelope(nextId++, message, time + delay, sender, destination) } /** @@ -302,20 +301,16 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot /** * The continuation to resume the execution of the process. */ - lateinit var continuation: Continuation<Envelope<*>> + lateinit var continuation: Continuation<Envelope> /** * The last point in time the process has done some work. */ var last: Instant = -1 - override suspend fun <T> receive(transform: suspend Envelope<*>.(Any) -> T): T { - val envelope = receiveEnvelope() - return transform(envelope, envelope.message) - } - + override suspend fun receive(): Any = receiveEnvelope().message - override suspend fun <T> receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { + override suspend fun receive(timeout: Duration): Any? { val send = prepare(Timeout, process, process, timeout).also { schedule(it) } try { @@ -323,7 +318,7 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot if (received.message != Timeout) { send.canceled = true - return transform(received, received.message) + return received.message } return null @@ -380,7 +375,7 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot * * @return The envelope containing the message. */ - suspend fun receiveEnvelope() = suspendCoroutine<Envelope<*>> { continuation = it } + suspend fun receiveEnvelope() = suspendCoroutine<Envelope> { continuation = it } .also { sender = it.sender } // Completion continuation implementation diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index 74fa686b..c47f9a26 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -41,9 +41,8 @@ internal class SmokeTest { override val initialState = Unit override suspend fun Context<Unit, Unit>.run() { while (true) { - receive { - sender?.send(message) - } + val msg = receive() + sender?.send(msg) } } } |
