summaryrefslogtreecommitdiff
path: root/opendc-kernel-omega
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-kernel-omega')
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt37
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt5
2 files changed, 18 insertions, 24 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
index 8c1497c8..4d94bf9e 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
@@ -53,9 +53,9 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
/**
* The message queue.
*/
- private val queue: Queue<MessageContainer> = PriorityQueue(Comparator
- .comparingLong(MessageContainer::time)
- .thenComparingLong(MessageContainer::id))
+ private val queue: Queue<Envelope> = PriorityQueue(Comparator
+ .comparingLong(Envelope::time)
+ .thenComparingLong(Envelope::id))
/**
* The kernel process instance that handles internal operations during the simulation.
@@ -199,13 +199,12 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
* @property time The point in time to deliver the message.
* @property sender The sender of the message.
* @property destination The destination of the message.
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
- private data class MessageContainer(val id: Long,
- override val message: Any,
- val time: Instant,
- override val sender: Entity<*, *>?,
- override val destination: Entity<*, *>) : Envelope<Any> {
+ private data class Envelope(val id: Long,
+ val message: Any,
+ val time: Instant,
+ val sender: Entity<*, *>?,
+ val destination: Entity<*, *>) {
/**
* A flag to indicate the message has been canceled.
*/
@@ -217,7 +216,7 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
*
* @param envelope The envelope containing the message to schedule.
*/
- private fun schedule(envelope: MessageContainer) {
+ private fun schedule(envelope: Envelope) {
queue.add(envelope)
}
@@ -230,9 +229,9 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
* @param delay The time to delay the message.
*/
private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null,
- delay: Duration): MessageContainer {
+ delay: Duration): Envelope {
require(delay >= 0) { "The amount of time to delay the message must be a positive number" }
- return MessageContainer(nextId++, message, time + delay, sender, destination)
+ return Envelope(nextId++, message, time + delay, sender, destination)
}
/**
@@ -302,20 +301,16 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
/**
* The continuation to resume the execution of the process.
*/
- lateinit var continuation: Continuation<Envelope<*>>
+ lateinit var continuation: Continuation<Envelope>
/**
* The last point in time the process has done some work.
*/
var last: Instant = -1
- override suspend fun <T> receive(transform: suspend Envelope<*>.(Any) -> T): T {
- val envelope = receiveEnvelope()
- return transform(envelope, envelope.message)
- }
-
+ override suspend fun receive(): Any = receiveEnvelope().message
- override suspend fun <T> receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? {
+ override suspend fun receive(timeout: Duration): Any? {
val send = prepare(Timeout, process, process, timeout).also { schedule(it) }
try {
@@ -323,7 +318,7 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
if (received.message != Timeout) {
send.canceled = true
- return transform(received, received.message)
+ return received.message
}
return null
@@ -380,7 +375,7 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
*
* @return The envelope containing the message.
*/
- suspend fun receiveEnvelope() = suspendCoroutine<Envelope<*>> { continuation = it }
+ suspend fun receiveEnvelope() = suspendCoroutine<Envelope> { continuation = it }
.also { sender = it.sender }
// Completion continuation implementation
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
index 74fa686b..c47f9a26 100644
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
+++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
@@ -41,9 +41,8 @@ internal class SmokeTest {
override val initialState = Unit
override suspend fun Context<Unit, Unit>.run() {
while (true) {
- receive {
- sender?.send(message)
- }
+ val msg = receive()
+ sender?.send(msg)
}
}
}