From 735d5543ed72f0c6cf632b35b3f23323cebcf81b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 20 Sep 2017 11:50:20 +0200 Subject: Allow suspendable processing of message envelopes This change allows a process to suspend within a receive block. --- .../src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt | 4 ++-- .../main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt index 8faf17ab..398e9697 100644 --- a/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt +++ b/opendc-core/src/main/kotlin/nl/atlarge/opendc/kernel/messaging/Readable.kt @@ -42,7 +42,7 @@ interface Readable { * @param block The block to process the message with. * @return The processed message. */ - suspend fun receive(block: Envelope<*>.(Any) -> T): T + suspend fun receive(block: suspend Envelope<*>.(Any) -> T): T /** * Retrieve and removes a single message from the entity's mailbox, suspending the function if the mailbox is empty. @@ -55,7 +55,7 @@ interface Readable { * @param block The block to process the message with. * @return The processed message or `null` if the timeout was reached. */ - suspend fun receive(timeout: Duration, block: Envelope<*>.(Any) -> T): T? + suspend fun receive(timeout: Duration, block: suspend Envelope<*>.(Any) -> T): T? /** * Retrieve and removes a single message from the entity's mailbox, suspending the function until a message has diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt index 65d9dd60..a29f2b08 100644 --- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt @@ -166,7 +166,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * @param delay The amount of time to wait before processing the message. */ override fun schedule(message: Any, destination: Entity<*>, sender: Entity<*>?, delay: Duration): Receipt { - require(delay > 0) { "The amount of time to delay the message must be a positive number" } + require(delay >= 0) { "The amount of time to delay the message must be a positive number" } val wrapped = MessageContainer(message, clock.now + delay, sender, destination) queue.add(wrapped) return wrapped @@ -257,7 +257,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * @param block The block to process the message with. * @return The processed message. */ - suspend override fun receive(block: Envelope<*>.(Any) -> T): T { + suspend override fun receive(block: suspend Envelope<*>.(Any) -> T): T { val envelope = receiveEnvelope() return block(envelope, envelope.message) } @@ -273,7 +273,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * @param block The block to process the message with. * @return The processed message or `null` if the timeout was reached. */ - suspend override fun receive(timeout: Duration, block: Envelope<*>.(Any) -> T): T? { + suspend override fun receive(timeout: Duration, block: suspend Envelope<*>.(Any) -> T): T? { val receipt = schedule(Timeout, entity, entity, timeout) val envelope = receiveEnvelope() -- cgit v1.2.3