From 1e6d752d2186987e27059eececf951f68ce98977 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Feb 2020 13:45:19 +0100 Subject: bug: Guarantee FIFO order of messages in the queue This change fixes the issue where messages are not delivered in FIFO order due to the internal priority not guaranteeing insertion order. For now, we fix this issue by adding a unique increasing identifier to each event in the queue. --- .../odcsim/engine/omega/OmegaSimulationEngine.kt | 42 ++++++++++++++-------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 34e5fd9a..767e139a 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -46,8 +46,10 @@ import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Runnable +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.channels.Channel as KChannel import kotlinx.coroutines.isActive import kotlinx.coroutines.selects.SelectClause1 @@ -88,21 +90,23 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ private val channels: MutableSet> = HashSet() + private var nextId: Long = 0 + /** * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior. */ @InternalCoroutinesApi private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay { override fun dispatch(context: CoroutineContext, block: Runnable) { - schedule(Event.Dispatch(clock.time, block)) + schedule(Event.Dispatch(clock.time, nextId++, block)) } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - schedule(Event.Resume(clock.time + timeMillis, this, continuation)) + schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation)) } override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - val event = Event.Timeout(clock.time + timeMillis, block) + val event = Event.Timeout(clock.time + timeMillis, nextId++, block) schedule(event) return event } @@ -174,12 +178,14 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : } private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation { + val job = SupervisorJob() + override val clock: Clock get() = this@OmegaSimulationEngine.clock override fun spawn(behavior: Behavior): ProcessRef { val name = "$" + UUID.randomUUID() - return spawn(behavior, name) + return this@OmegaSimulationEngine.spawn(behavior, name) } override fun spawn(behavior: Behavior, name: String): ProcessRef { @@ -209,13 +215,16 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : // Stop the logical process if (result.isFailure) { result.exceptionOrNull()!!.printStackTrace() + job.completeExceptionally(result.exceptionOrNull()!!) + } else { + job.complete() } } override val key: CoroutineContext.Key<*> = ProcessContext.Key @InternalCoroutinesApi - override val context: CoroutineContext = this + dispatcher + override val context: CoroutineContext = this + dispatcher + job } /** @@ -245,7 +254,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : /** * The underlying `kotlinx.coroutines` channel to back this channel implementation. */ - private val channel = KChannel(KChannel.CONFLATED) + private val channel = KChannel(KChannel.UNLIMITED) val onReceive: SelectClause1 get() = channel.onReceive @@ -275,7 +284,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun send(message: T) { check(!closed) { "Port is closed" } - schedule(Event.Send(clock.time, channelImpl, message)) + schedule(Event.Send(clock.time, nextId++, channelImpl, message)) } } @@ -305,17 +314,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : * * @property time The point in time to deliver the message. */ - private sealed class Event(val time: Long) : Comparable, Runnable { - override fun compareTo(other: Event): Int = time.compareTo(other.time) + private sealed class Event(val time: Long, val id: Long) : Comparable, Runnable { + override fun compareTo(other: Event): Int { + val cmp = time.compareTo(other.time) + return if (cmp == 0) id.compareTo(other.id) else cmp + } - class Dispatch(time: Long, val block: Runnable) : Event(time) { + class Dispatch(time: Long, id: Long, val block: Runnable) : Event(time, id) { override fun run() = block.run() override fun toString(): String = "Dispatch[$time]" } - class Resume(time: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation) : Event(time) { - @InternalCoroutinesApi + class Resume(time: Long, id: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation) : Event(time, id) { + @ExperimentalCoroutinesApi override fun run() { with(continuation) { dispatcher.resumeUndispatched(Unit) } } @@ -323,7 +335,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun toString(): String = "Resume[$time]" } - class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle { + class Timeout(time: Long, id: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time, id), DisposableHandle { override fun run() { if (!cancelled) { block.run() @@ -334,10 +346,10 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : cancelled = true } - override fun toString(): String = "Dispatch[$time]" + override fun toString(): String = "Timeout[$time]" } - class Send(time: Long, val channel: ChannelImpl, val message: T) : Event(time) { + class Send(time: Long, id: Long, val channel: ChannelImpl, val message: T) : Event(time, id) { override fun run() { channel.send(message) } -- cgit v1.2.3 From 5de6ec076fa8bc19c34449bcc085dca184d2e17f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Feb 2020 13:50:57 +0100 Subject: feat: Add helper methods for req-res pattern This change adds helper methods for simplifying the request-response pattern commonly used in models. --- .../src/main/kotlin/com/atlarge/odcsim/Channels.kt | 52 ++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt new file mode 100644 index 00000000..b15ce3ae --- /dev/null +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim + +suspend fun SendRef.ask(block: (SendRef) -> T): U { + val ctx = processContext + val outlet = ctx.connect(this) + val channel = ctx.open() + try { + outlet.send(block(channel.send)) + } finally { + outlet.close() + } + + val inlet = ctx.listen(channel.receive) + try { + return inlet.receive() + } finally { + inlet.close() + } +} + +suspend fun SendRef.sendOnce(msg: T) { + val outlet = processContext.connect(this) + try { + outlet.send(msg) + } finally { + outlet.close() + } +} -- cgit v1.2.3 From 30d03df110bd0f2f805eaf89026660926929fa38 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Feb 2020 13:53:00 +0100 Subject: refactor: Reimplement OpenDC model using 2.x API --- opendc/README.md | 8 + opendc/build.gradle.kts | 23 ++ opendc/opendc-core/build.gradle.kts | 39 +++ .../main/kotlin/com/atlarge/opendc/core/Broker.kt | 41 +++ .../main/kotlin/com/atlarge/opendc/core/Cluster.kt | 55 +++++ .../kotlin/com/atlarge/opendc/core/Environment.kt | 36 +++ .../kotlin/com/atlarge/opendc/core/Identity.kt | 42 ++++ .../main/kotlin/com/atlarge/opendc/core/Model.kt | 51 ++++ .../kotlin/com/atlarge/opendc/core/Platform.kt | 103 ++++++++ .../main/kotlin/com/atlarge/opendc/core/User.kt | 35 +++ .../main/kotlin/com/atlarge/opendc/core/Zone.kt | 194 +++++++++++++++ .../opendc/core/resources/compute/Machine.kt | 107 ++++++++ .../opendc/core/resources/compute/MachineStatus.kt | 33 +++ .../core/resources/compute/ProcessingElement.kt | 33 +++ .../core/resources/compute/ProcessingUnit.kt | 44 ++++ .../opendc/core/resources/compute/host/Host.kt | 83 +++++++ .../compute/scheduling/MachineScheduler.kt | 42 ++++ .../compute/scheduling/MachineSchedulerLogic.kt | 64 +++++ .../compute/scheduling/ProcessObserver.kt | 77 ++++++ .../resources/compute/scheduling/ProcessState.kt | 50 ++++ .../resources/compute/scheduling/ProcessView.kt | 53 ++++ .../scheduling/SpaceSharedMachineScheduler.kt | 189 ++++++++++++++ .../compute/supervision/MachineSupervisionEvent.kt | 49 ++++ .../compute/supervision/MachineSupervisor.kt | 76 ++++++ .../com/atlarge/opendc/core/services/Service.kt | 47 ++++ .../com/atlarge/opendc/core/services/ServiceMap.kt | 49 ++++ .../opendc/core/services/ServiceProvider.kt | 68 +++++ .../services/provisioning/ProvisioningService.kt | 76 ++++++ .../provisioning/SimpleProvisioningService.kt | 107 ++++++++ .../opendc/core/services/resources/HostView.kt | 42 ++++ .../resources/ResourceManagementService.kt | 120 +++++++++ .../com/atlarge/opendc/core/workload/Workload.kt | 39 +++ .../core/workload/application/Application.kt | 49 ++++ .../core/workload/application/FlopsApplication.kt | 114 +++++++++ .../opendc/core/workload/application/Process.kt | 90 +++++++ .../core/workload/application/ProcessSupervisor.kt | 83 +++++++ opendc/opendc-experiments-tpds/build.gradle.kts | 47 ++++ .../opendc/experiments/tpds/TestExperiment.kt | 143 +++++++++++ .../src/main/resources/env/setup-test.json | 36 +++ opendc/opendc-format/build.gradle.kts | 42 ++++ .../opendc/format/environment/EnvironmentReader.kt | 38 +++ .../opendc/format/environment/sc18/Model.kt | 44 ++++ .../environment/sc18/Sc18EnvironmentReader.kt | 95 +++++++ .../com/atlarge/opendc/format/trace/TraceEntry.kt | 54 ++++ .../com/atlarge/opendc/format/trace/TraceReader.kt | 37 +++ .../com/atlarge/opendc/format/trace/TraceWriter.kt | 45 ++++ .../opendc/format/trace/gwf/GwfTraceReader.kt | 167 +++++++++++++ opendc/opendc-workflows/build.gradle.kts | 39 +++ .../workflows/service/StageWorkflowScheduler.kt | 59 +++++ .../service/StageWorkflowSchedulerLogic.kt | 275 +++++++++++++++++++++ .../opendc/workflows/service/WorkflowScheduler.kt | 48 ++++ .../workflows/service/WorkflowSchedulerLogic.kt | 56 +++++ .../workflows/service/WorkflowSchedulerMode.kt | 40 +++ .../opendc/workflows/service/WorkflowService.kt | 184 ++++++++++++++ .../service/stage/job/FifoJobSortingPolicy.kt | 37 +++ .../service/stage/job/JobAdmissionPolicy.kt | 48 ++++ .../service/stage/job/JobSortingPolicy.kt | 44 ++++ .../service/stage/job/NullJobAdmissionPolicy.kt | 40 +++ .../service/stage/job/RandomJobSortingPolicy.kt | 40 +++ .../resource/FirstFitResourceSelectionPolicy.kt | 40 +++ .../FunctionalResourceDynamicFilterPolicy.kt | 43 ++++ .../stage/resource/ResourceDynamicFilterPolicy.kt | 49 ++++ .../stage/resource/ResourceSelectionPolicy.kt | 48 ++++ .../service/stage/task/FifoTaskSortingPolicy.kt | 37 +++ .../stage/task/FunctionalTaskEligibilityPolicy.kt | 38 +++ .../service/stage/task/RandomTaskSortingPolicy.kt | 40 +++ .../service/stage/task/TaskEligibilityPolicy.kt | 48 ++++ .../service/stage/task/TaskSortingPolicy.kt | 45 ++++ .../com/atlarge/opendc/workflows/workload/Job.kt | 48 ++++ .../com/atlarge/opendc/workflows/workload/Task.kt | 48 ++++ settings.gradle.kts | 4 + 71 files changed, 4557 insertions(+) create mode 100644 opendc/README.md create mode 100644 opendc/build.gradle.kts create mode 100644 opendc/opendc-core/build.gradle.kts create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Broker.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Cluster.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Identity.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Model.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/User.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/Machine.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/MachineStatus.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingElement.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingUnit.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/host/Host.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineScheduler.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineSchedulerLogic.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessObserver.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessState.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessView.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/SpaceSharedMachineScheduler.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisionEvent.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisor.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/Service.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceMap.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceProvider.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/ProvisioningService.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/SimpleProvisioningService.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/HostView.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/ResourceManagementService.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/Workload.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Application.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/FlopsApplication.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Process.kt create mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/ProcessSupervisor.kt create mode 100644 opendc/opendc-experiments-tpds/build.gradle.kts create mode 100644 opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt create mode 100644 opendc/opendc-experiments-tpds/src/main/resources/env/setup-test.json create mode 100644 opendc/opendc-format/build.gradle.kts create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt create mode 100644 opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt create mode 100644 opendc/opendc-workflows/build.gradle.kts create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt diff --git a/opendc/README.md b/opendc/README.md new file mode 100644 index 00000000..e3c65ba7 --- /dev/null +++ b/opendc/README.md @@ -0,0 +1,8 @@ +

+ + OpenDC + +
+ OpenDC +

+ diff --git a/opendc/build.gradle.kts b/opendc/build.gradle.kts new file mode 100644 index 00000000..cc3f3add --- /dev/null +++ b/opendc/build.gradle.kts @@ -0,0 +1,23 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ diff --git a/opendc/opendc-core/build.gradle.kts b/opendc/opendc-core/build.gradle.kts new file mode 100644 index 00000000..0ac1f1ea --- /dev/null +++ b/opendc/opendc-core/build.gradle.kts @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Base model for datacenter simulation" + +/* Build configuration */ +plugins { + `kotlin-library-convention` +} + +dependencies { + implementation(kotlin("stdlib")) + api(project(":odcsim:odcsim-api")) + + testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") + testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Broker.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Broker.kt new file mode 100644 index 00000000..a3d6b0a7 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Broker.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef + +/** + * A broker acting on the various cloud platforms on behalf of the user. + */ +interface Broker { + /** + * Build the runtime behavior of the [Broker]. + * + * @param platforms A list of available cloud platforms. + * @return The runtime behavior of the broker. + */ + suspend operator fun invoke(ctx: ProcessContext, platforms: List>) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Cluster.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Cluster.kt new file mode 100644 index 00000000..da9aed00 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Cluster.kt @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.resources.compute.MachineMessage +import com.atlarge.opendc.core.resources.compute.host.Host +import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent +import java.util.UUID + +/** + * A logical grouping of heterogeneous hosts and primary storage within a zone. + * + * @property uid The unique identifier of the cluster. + * @property name The name of this cluster. + * @property hosts The machines in this cluster. + */ +data class Cluster(override val uid: UUID, override val name: String, val hosts: List) : Identity { + /** + * Build the runtime [Behavior] of this cluster of hosts. + * + * @param manager The manager of the cluster. + */ + suspend operator fun invoke(ctx: ProcessContext, manager: SendRef) { + // Launch all hosts in the cluster + for (host in hosts) { + val channel = ctx.open() + ctx.spawn({ host(it, manager, channel) }, name = host.name) + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt new file mode 100644 index 00000000..5bdff0b6 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt @@ -0,0 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +/** + * A description of a large-scale computing environment. This description includes including key size and topology + * information of the environment, types of resources, but also various operational and management rules such as + * scheduled maintenance, allocation and other constraints. + * + * @property name The name of the environment. + * @property description A small textual description about the environment that is being modeled + * @property platforms The cloud platforms (such as AWS or GCE) in this environment. + */ +data class Environment(val name: String, val description: String?, val platforms: List) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Identity.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Identity.kt new file mode 100644 index 00000000..c87e934f --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Identity.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import java.util.UUID + +/** + * An object that has a unique identity. + */ +interface Identity { + /** + * A unique, opaque, system-generated value, representing the object. + */ + val uid: UUID + + /** + * A non-empty, human-readable string representing the object. + */ + val name: String +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Model.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Model.kt new file mode 100644 index 00000000..3d16c4b2 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Model.kt @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import com.atlarge.odcsim.ProcessContext + +/** + * A simulation model for large-scale simulation of datacenter infrastructure, built with the *odcsim* API. + * + * @property environment The environment in which brokers operate. + * @property brokers The brokers acting on the cloud platforms. + */ +data class Model(val environment: Environment, val brokers: List) { + /** + * Build the runtime behavior of the universe. + */ + suspend operator fun invoke(ctx: ProcessContext) { + // Setup the environment + val platforms = environment.platforms.map { platform -> + val channel = ctx.open() + ctx.spawn({ platform(it, channel.receive) }, name = platform.name) + channel.send + } + + for (broker in brokers) { + ctx.spawn { broker(it, platforms) } + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt new file mode 100644 index 00000000..fab67962 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt @@ -0,0 +1,103 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.ask +import com.atlarge.odcsim.sendOnce +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * A representation of a cloud platform such as Amazon Web Services (AWS), Microsoft Azure or Google Cloud. + * + * @property uid The unique identifier of this datacenter. + * @property name the name of the platform. + * @property zones The availability zones available on this platform. + */ +data class Platform(override val uid: UUID, override val name: String, val zones: List) : Identity { + /** + * Build the runtime [Behavior] of this cloud platform. + */ + suspend operator fun invoke(ctx: ProcessContext, main: ReceiveRef) { + println("Starting cloud platform $name [$uid] with ${zones.size} zones") + + // Launch all zones of the cloud platform + val zoneInstances = zones.associateWith { zone -> + val channel = ctx.open() + ctx.spawn({ zone(it, channel) }, name = zone.name) + channel.send + } + + val inlet = ctx.listen(main) + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is PlatformMessage.ListZones -> { + msg.replyTo.sendOnce(PlatformResponse.Zones(this@Platform, zoneInstances.mapKeys { it.key.name })) + } + } + } + } + inlet.close() + } +} + +/** + * A message protocol for communicating with a cloud platform. + */ +sealed class PlatformMessage { + /** + * Request the available zones on this platform. + * + * @property replyTo The actor address to send the response to. + */ + data class ListZones(val replyTo: SendRef) : PlatformMessage() +} + +/** + * A message protocol used by platform actors to respond to [PlatformMessage]s. + */ +sealed class PlatformResponse { + /** + * The zones available on this cloud platform. + * + * @property platform The reference to the cloud platform these are the zones of. + * @property zones The zones in this cloud platform. + */ + data class Zones(val platform: Platform, val zones: Map>) : PlatformResponse() +} + +/** + * Retrieve the available zones of a platform. + */ +suspend fun SendRef.zones(): Map> { + val zones: PlatformResponse.Zones = ask { PlatformMessage.ListZones(it) } + return zones.zones +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/User.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/User.kt new file mode 100644 index 00000000..6105ae9e --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/User.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +/** + * A user of the cloud network. + */ +interface User : Identity { + /** + * The name of the user. + */ + override val name: String +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt new file mode 100644 index 00000000..07361423 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt @@ -0,0 +1,194 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.ask +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.services.Service +import com.atlarge.opendc.core.services.ServiceProvider +import java.util.ArrayDeque +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * An isolated location within a datacenter region from which public cloud services operate, roughly equivalent to a + * single datacenter. Zones contain one or more clusters and secondary storage. + * + * This class models *only* the static information of a zone, with dynamic information being contained within the zone's + * actor. During runtime, it's actor acts as a registry for all the cloud services provided by the zone. + * + * @property uid The unique identifier of this availability zone + * @property name The name of the zone within its platform. + * @property services The initial set of services provided by the zone. + * @property clusters The clusters of machines in this zone. + */ +data class Zone( + override val uid: UUID, + override val name: String, + val services: Set, + val clusters: List +) : Identity { + /** + * Build the runtime [Behavior] of this datacenter. + */ + suspend operator fun invoke(ctx: ProcessContext, main: Channel) { + println("Starting zone $name [$uid]") + + // Launch all services of the zone + val instances: MutableMap, SendRef<*>> = mutableMapOf() + validateDependencies(services) + + for (provider in services) { + val channel = ctx.open() + println("Spawning service ${provider.name}") + ctx.spawn({ provider(it, this, main.send, channel) }, name = "${provider.name}-${provider.uid}") + provider.provides.forEach { instances[it] = channel.send } + } + + val inlet = ctx.listen(main.receive) + + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is ZoneMessage.Find -> { + msg.replyTo.sendOnce(ZoneResponse.Listing(this@Zone, msg.key, instances[msg.key])) + } + } + } + } + } + + /** + * Validate the service for unsatisfiable dependencies. + */ + private fun validateDependencies(providers: Set) { + val providersByKey = HashMap, ServiceProvider>() + for (provider in providers) { + if (provider.provides.isEmpty()) { + throw IllegalArgumentException(("Service provider $provider does not provide any service.")) + } + for (key in provider.provides) { + if (key in providersByKey) { + throw IllegalArgumentException("Multiple providers for service $key") + } + providersByKey[key] = provider + } + } + + val visited = HashSet() + val queue = ArrayDeque(providers) + while (queue.isNotEmpty()) { + val service = queue.poll() + visited.add(service) + + for (dependencyKey in service.dependencies) { + val dependency = providersByKey[dependencyKey] + ?: throw IllegalArgumentException("Dependency $dependencyKey not satisfied for service $service") + if (dependency !in visited) { + queue.add(dependency) + } + } + } + } + + override fun equals(other: Any?): Boolean = other is Zone && uid == other.uid + override fun hashCode(): Int = uid.hashCode() +} + +/** + * A message protocol for communicating with the service registry + */ +sealed class ZoneMessage { + /** + * Lookup the specified service in this availability zone. + * + * @property key The key of the service to lookup. + * @property replyTo The address to reply to. + */ + data class Find( + val key: Service<*>, + val replyTo: SendRef + ) : ZoneMessage() +} + +/** + * A message protocol used by service registry actors to respond to [ZoneMessage]s. + */ +sealed class ZoneResponse { + /** + * The response sent when looking up services in a zone. + * + * @property zone The zone from which the response originates. + * @property key The key of the service that was looked up. + * @property ref The reference to the service or `null` if it is not present in the zone. + */ + data class Listing( + val zone: Zone, + val key: Service<*>, + private val ref: SendRef<*>? + ) : ZoneResponse() { + /** + * A flag to indicate whether the service is present. + */ + val isPresent: Boolean + get() = ref != null + + /** + * Determine whether this listing is for the specified key. + * + * @param key The key to check for. + * @return `true` if the listing is for this key, `false` otherwise. + */ + fun isForKey(key: Service<*>): Boolean = key == this.key + + /** + * Extract the result from the service lookup. + * + * @param key The key of the lookup. + * @return The reference to the service or `null` if it is not present in the zone. + */ + operator fun invoke(key: Service): SendRef? { + require(this.key == key) { "Invalid key" } + @Suppress("UNCHECKED_CAST") + return ref as? SendRef + } + } +} + +/** + * Find the reference to the specified [ServiceProvider]. + * + * @param key The key of the service to find. + * @throws IllegalArgumentException if the service is not found. + */ +suspend fun SendRef.find(key: Service): SendRef { + val listing: ZoneResponse.Listing = ask { ZoneMessage.Find(key, it) } + return listing(key) ?: throw IllegalArgumentException("Unknown key $key") +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/Machine.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/Machine.kt new file mode 100644 index 00000000..f25fa3cc --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/Machine.kt @@ -0,0 +1,107 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.Identity +import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.Pid + +/** + * A generic representation of a compute node (either physical or virtual) that is able to run [Application]s. + */ +interface Machine : Identity { + /** + * The details of the machine in key/value pairs. + */ + val details: Map + + /** + * Build the runtime [Behavior] of this compute resource, accepting messages of [MachineMessage]. + * + * @param supervisor The supervisor of the machine. + */ + suspend operator fun invoke(ctx: ProcessContext, supervisor: SendRef, main: Channel) +} + +/** + * A reference to a machine instance that accepts messages of type [MachineMessage]. + */ +typealias MachineRef = SendRef + +/** + * A message protocol for communicating with machine instances. + */ +sealed class MachineMessage { + /** + * Launch the specified [Application] on the machine instance. + * + * @property application The application to submit. + * @property key The key to identify this submission. + * @property broker The broker of the process to spawn. + */ + data class Submit( + val application: Application, + val key: Any, + val broker: SendRef + ) : MachineMessage() +} + +/** + * A message protocol used by machine instances to respond to [MachineMessage]s. + */ +sealed class MachineEvent { + /** + * Indicate that an [Application] was spawned on a machine instance. + * + * @property instance The machine instance to which the application was submitted. + * @property application The application that has been submitted. + * @property key The key used to identify the submission. + * @property pid The spawned application instance. + */ + data class Submitted( + val instance: MachineRef, + val application: Application, + val key: Any, + val pid: Pid + ) : MachineEvent() + + /** + * Indicate that an [Application] has terminated on the specified machine. + * + * @property instance The machine instance to which the application was submitted. + * @property pid The reference to the application instance that has terminated. + * @property status The exit code of the task, where zero means successful. + */ + data class Terminated( + val instance: MachineRef, + val pid: Pid, + val status: Int = 0 + ) : MachineEvent() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/MachineStatus.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/MachineStatus.kt new file mode 100644 index 00000000..af039bcc --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/MachineStatus.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute + +/** + * The status of a machine. + */ +enum class MachineStatus { + HALT, + RUNNING +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingElement.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingElement.kt new file mode 100644 index 00000000..23a5b444 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingElement.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute + +/** + * A logical core in a CPU. + * + * @property id The identifier of the core within the CPU. + * @property unit The [ProcessingUnit] the core is part of. + */ +data class ProcessingElement(val id: Int, val unit: ProcessingUnit) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingUnit.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingUnit.kt new file mode 100644 index 00000000..76985f64 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingUnit.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute + +/** + * A processing unit of a compute resource, either virtual or physical. + * + * @property vendor The vendor string of the cpu. + * @property family The cpu family number. + * @property model The model number of the cpu. + * @property modelName The name of the cpu model. + * @property clockRate The clock speed of the cpu in MHz. + * @property cores The number of logical cores in the cpu. + */ +data class ProcessingUnit( + val vendor: String, + val family: Int, + val model: Int, + val modelName: String, + val clockRate: Double, + val cores: Int +) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/host/Host.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/host/Host.kt new file mode 100644 index 00000000..21217468 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/host/Host.kt @@ -0,0 +1,83 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.host + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineMessage +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import com.atlarge.opendc.core.resources.compute.scheduling.MachineScheduler +import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * A physical compute node in a datacenter that is able to run [Application]s. + * + * @property uid The unique identifier of this machine. + * @property name The name of the machine. + * @property scheduler The process scheduler of this machine. + * @property cores The list of processing elements in the machine. + * @property details The details of this host. + */ +data class Host( + override val uid: UUID, + override val name: String, + val scheduler: MachineScheduler, + val cores: List, + override val details: Map = emptyMap() +) : Machine { + /** + * Build the [Behavior] for a physical machine. + */ + override suspend fun invoke(ctx: ProcessContext, supervisor: SendRef, main: Channel) { + coroutineScope { + supervisor.sendOnce(MachineSupervisionEvent.Announce(this@Host, main.send)) + supervisor.sendOnce(MachineSupervisionEvent.Up(main.send)) + + val sched = scheduler(ctx, this, this@Host, main.send) + sched.updateResources(cores) + + val inlet = ctx.listen(main.receive) + while (isActive) { + when (val msg = inlet.receive()) { + is MachineMessage.Submit -> { + sched.submit(msg.application, msg.key, msg.broker) + } + } + } + inlet.close() + } + } + + override fun equals(other: Any?): Boolean = other is Machine && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineScheduler.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineScheduler.kt new file mode 100644 index 00000000..400c6a0f --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineScheduler.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineRef +import kotlinx.coroutines.CoroutineScope + +/** + * A factory interface for constructing a [MachineSchedulerLogic]. + */ +interface MachineScheduler { + /** + * Construct a [MachineSchedulerLogic] in the given [ProcessContext]. + * + * @param machine The machine to create the scheduler for. + */ + operator fun invoke(ctx: ProcessContext, coroutineScope: CoroutineScope, machine: Machine, machineRef: MachineRef): MachineSchedulerLogic +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineSchedulerLogic.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineSchedulerLogic.kt new file mode 100644 index 00000000..9bc20eb8 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineSchedulerLogic.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineEvent +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.ProcessSupervisor +import kotlinx.coroutines.CoroutineScope + +/** + * A scheduler that distributes processes over processing elements in a machine. + * + * @property ctx The context in which the scheduler runs. + * @property machine The machine to create the scheduler for. + */ +abstract class MachineSchedulerLogic( + protected val ctx: ProcessContext, + protected val coroutineScope: CoroutineScope, + protected val machine: Machine, + protected val machineRef: MachineRef +) : ProcessSupervisor { + /** + * Update the available resources in the machine. + * + * @param cores The available processing cores for the scheduler. + */ + abstract suspend fun updateResources(cores: List) + + /** + * Submit the specified application for scheduling. + * + * @param application The application to submit. + * @param key The key to identify the application instance. + * @param handler The broker of this application. + */ + abstract suspend fun submit(application: Application, key: Any, handler: SendRef) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessObserver.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessObserver.kt new file mode 100644 index 00000000..2cfeec06 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessObserver.kt @@ -0,0 +1,77 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.opendc.core.resources.compute.MachineEvent +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.Pid +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * An interface for observing processes. + */ +interface ProcessObserver { + /** + * This method is invoked when the setup of an application completed successfully. + * + * @param pid The process id of the process that has been initialized. + */ + fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) + + /** + * This method is invoked when a process exits. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + fun onTermination(instance: MachineRef, pid: Pid, status: Int) + + companion object { + /** + * Create the [Behavior] for a [ProcessObserver]. + * + * @param observer The observer to create the behavior for. + */ + suspend operator fun invoke(ctx: ProcessContext, observer: ProcessObserver, main: ReceiveRef) { + val inlet = ctx.listen(main) + + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is MachineEvent.Submitted -> observer.onSubmission(msg.instance, msg.application, msg.key, msg.pid) + is MachineEvent.Terminated -> observer.onTermination(msg.instance, msg.pid, msg.status) + } + } + } + + inlet.close() + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessState.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessState.kt new file mode 100644 index 00000000..e9e9a53e --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessState.kt @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +/** + * An enumeration of the distinct states of an application instance (process). + */ +enum class ProcessState { + /** + * Default state of a process, where the task is waiting to be assigned and installed on a machine. + */ + CREATED, + + /** + * State to indicate that the process is waiting to be ran. + */ + READY, + + /** + * State to indicate that the process is currently running. + */ + RUNNING, + + /** + * State to indicate that the process has been terminated, either successfully or due to failure. + */ + TERMINATED, +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessView.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessView.kt new file mode 100644 index 00000000..daf71af4 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessView.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.resources.compute.MachineEvent +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.Pid +import com.atlarge.opendc.core.workload.application.ProcessMessage + +/** + * A process represents a application instance running on a particular machine from the machine scheduler's point of + * view. + * + * @property application The application this is an instance of. + * @property broker The broker of the process, which is informed about its progress. + * @property pid The reference to the application instance. + * @property state The state of the process. + */ +data class ProcessView( + val application: Application, + val broker: SendRef, + val pid: Pid, + var state: ProcessState = ProcessState.CREATED +) { + /** + * The slice of processing elements allocated for the process. Available as soon as the state + * becomes [ProcessState.RUNNING] + */ + lateinit var allocation: ProcessMessage.Allocation +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/SpaceSharedMachineScheduler.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/SpaceSharedMachineScheduler.kt new file mode 100644 index 00000000..6cdc3ea5 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/SpaceSharedMachineScheduler.kt @@ -0,0 +1,189 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineEvent +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.Pid +import com.atlarge.opendc.core.workload.application.ProcessEvent +import com.atlarge.opendc.core.workload.application.ProcessMessage +import com.atlarge.opendc.core.workload.application.ProcessSupervisor +import java.util.ArrayDeque +import java.util.UUID +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +/** + * A machine scheduling policy where processes are space-shared on the machine. + * + * Space-sharing for machine scheduling means that all running processes will be allocated a separate set of the + * [ProcessingElement]s in a [Machine]. Applications are scheduled on the machine in first-in-first-out (FIFO) order, + * thus larger applications may block smaller tasks from proceeding, while space is available (no backfilling). + * + * @property ctx The context in which the scheduler runs. + * @property machine The machine to create the scheduler for. + */ +class SpaceSharedMachineScheduler( + ctx: ProcessContext, + coroutineScope: CoroutineScope, + machine: Machine, + machineRef: MachineRef +) : MachineSchedulerLogic(ctx, coroutineScope, machine, machineRef), ProcessSupervisor { + private var cores = 0 + private val available = ArrayDeque() + private val queue = ArrayDeque>() + private val running = LinkedHashSet>() + private val processes = HashMap, ProcessView>() + private val jobs = HashMap, Job>() + private val channel = ctx.open() + + init { + coroutineScope.launch { + ProcessSupervisor(ctx, this@SpaceSharedMachineScheduler, channel.receive) + } + } + + override suspend fun updateResources(cores: List) { + available.addAll(cores) + this.cores = cores.size + + // Add all running tasks in front of the queue + running.reversed().forEach { queue.addFirst(it) } + running.clear() + + reschedule() + } + + override suspend fun submit(application: Application, key: Any, handler: SendRef) { + val channel = ctx.open() + val pid = channel.send + // Create application instance on the machine + ctx.spawn({ application(it, pid, channel.receive) }, name = application.name + ":" + application.uid + ":" + UUID.randomUUID().toString()) + processes[pid] = ProcessView(application, handler, pid) + + // Inform the owner that the task has been submitted + handler.sendOnce(MachineEvent.Submitted(machineRef, application, key, pid)) + + // Setup the task + pid.sendOnce(ProcessMessage.Setup(machine, this@SpaceSharedMachineScheduler.channel.send)) + } + + /** + * Reschedule the tasks on this machine. + */ + private fun reschedule() { + while (queue.isNotEmpty()) { + val pid = queue.peek() + val process = processes[pid]!! + + if (process.application.cores >= cores) { + // The task will never fit on the machine + // TODO Fail task + println("Process $process will not fit in machine: dropping.") + queue.remove() + return + } else if (process.application.cores > available.size) { + // The task will not fit at the moment + // Try again if resources become available + // ctx.log.debug("Application queued: not enough processing elements available [requested={}, available={}]", + // process.application.cores, available.size) + return + } + queue.remove() + + // Compute the available resources + val resources = List(process.application.cores) { + val pe = available.poll() + Pair(pe, 1.0) + }.toMap() + process.state = ProcessState.RUNNING + process.allocation = ProcessMessage.Allocation(resources, Long.MAX_VALUE) + running += pid + + coroutineScope.launch(Dispatchers.Unconfined) { + pid.sendOnce(process.allocation) + } + } + } + + override fun onReady(pid: Pid) { + val process = processes[pid]!! + + // Schedule the task if it has been setup + queue.add(pid) + process.state = ProcessState.READY + + reschedule() + } + + override fun onConsume(pid: Pid, utilization: Map, until: Long) { + val process = processes[pid]!! + val allocation = process.allocation + + if (until > allocation.until) { + // Tasks are not allowed to extend allocation provided by the machine + // TODO Fail the task + println("Task $pid must not extend allocation provided by the machine") + } else if (until < allocation.until) { + // Shrink allocation + process.allocation = allocation.copy(until = until) + } + + // Reschedule the process after the allocation expires + jobs[pid] = coroutineScope.launch { + delay(process.allocation.until - ctx.clock.millis()) + // We just extend the allocation + process.allocation = process.allocation.copy(until = Long.MAX_VALUE) + pid.sendOnce(process.allocation) + } + } + + override fun onExit(pid: Pid, status: Int) { + val process = processes.remove(pid)!! + running -= pid + jobs[pid]?.cancel() + process.allocation.resources.keys.forEach { available.add(it) } + + // Inform the owner that the task has terminated + coroutineScope.launch { + process.broker.sendOnce(MachineEvent.Terminated(machineRef, pid, status)) + } + } + + companion object : MachineScheduler { + override fun invoke(ctx: ProcessContext, coroutineScope: CoroutineScope, machine: Machine, machineRef: MachineRef): MachineSchedulerLogic { + return SpaceSharedMachineScheduler(ctx, coroutineScope, machine, machineRef) + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisionEvent.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisionEvent.kt new file mode 100644 index 00000000..8a022112 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisionEvent.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.supervision + +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineRef + +/** + * A supervision protocol for [Machine] instances. + */ +sealed class MachineSupervisionEvent { + /** + * Initialization message to introduce to the supervisor a new machine by specifying its static information and + * address. + * + * @property machine The machine that is being announced. + * @property ref The address to talk to the host. + */ + data class Announce(val machine: Machine, val ref: MachineRef) : MachineSupervisionEvent() + + /** + * Indicate that the specified machine has booted up. + * + * @property ref The address to talk to the machine. + */ + data class Up(val ref: MachineRef) : MachineSupervisionEvent() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisor.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisor.kt new file mode 100644 index 00000000..37cf9d44 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisor.kt @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.supervision + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineRef +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * An interface for supervising [Machine] instances. + */ +interface MachineSupervisor { + /** + * This method is invoked when a new machine is introduced to the supervisor by specifying its static information + * and address. + * + * @param machine The machine that is being announced. + * @param ref The address to talk to the host. + */ + fun onAnnounce(machine: Machine, ref: MachineRef) + + /** + * This method is invoked when a process exits. + * + * @param ref The address to talk to the machine. + */ + fun onUp(ref: MachineRef) + + companion object { + /** + * Create the [Behavior] for a [MachineSupervisor]. + * + * @param supervisor The supervisor to create the behavior for. + */ + suspend operator fun invoke(ctx: ProcessContext, supervisor: MachineSupervisor, main: ReceiveRef) { + val inlet = ctx.listen(main) + + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is MachineSupervisionEvent.Announce -> supervisor.onAnnounce(msg.machine, msg.ref) + is MachineSupervisionEvent.Up -> supervisor.onUp(msg.ref) + } + } + } + + inlet.close() + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/Service.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/Service.kt new file mode 100644 index 00000000..8dcec760 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/Service.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services + +import com.atlarge.opendc.core.Identity +import java.util.UUID + +/** + * An interface for identifying service implementations of the same type (providing the same service). + * + * @param T The shape of the messages the service responds to. + */ +interface Service : Identity + +/** + * Helper class for constructing a [Service]. + * + * @property uid The unique identifier of the service. + * @property name The name of the service. + */ +abstract class AbstractService(override val uid: UUID, override val name: String) : Service { + override fun equals(other: Any?): Boolean = other is Service<*> && uid == other.uid + override fun hashCode(): Int = uid.hashCode() + override fun toString(): String = "Service[uid=$uid,name=$name]" +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceMap.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceMap.kt new file mode 100644 index 00000000..14cf4845 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceMap.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services + +import com.atlarge.odcsim.SendRef + +/** + * A map containing services. + */ +interface ServiceMap { + /** + * Determine if this map contains the service with the specified [Service]. + * + * @param key The key of the service to check for. + * @return `true` if the service is in the map, `false` otherwise. + */ + operator fun contains(key: Service<*>): Boolean + + /** + * Obtain the service with the specified [Service]. + * + * @param key The key of the service to obtain. + * @return The references to the service. + * @throws IllegalArgumentException if the key does not exists in the map. + */ + operator fun get(key: Service): SendRef +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceProvider.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceProvider.kt new file mode 100644 index 00000000..3592d578 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceProvider.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.Identity +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.ZoneMessage +import java.util.UUID + +/** + * An abstract representation of a cloud service implementation provided by a cloud platform. + */ +interface ServiceProvider : Identity { + /** + * The unique identifier of the service implementation. + */ + override val uid: UUID + + /** + * The name of the service implementation. + */ + override val name: String + + /** + * The set of services provided by this [ServiceProvider]. + */ + val provides: Set> + + /** + * The dependencies of the service implementation. + */ + val dependencies: Set> + + /** + * Build the runtime [Behavior] for this service. + * + * @param zone The zone model for which the service should be build. + * @param zoneRef The runtime reference to the zone's actor for communication. + * @param main The channel on which the service should listen. + */ + suspend operator fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef, main: Channel) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/ProvisioningService.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/ProvisioningService.kt new file mode 100644 index 00000000..604e1942 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/ProvisioningService.kt @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services.provisioning + +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.services.AbstractService +import com.atlarge.opendc.core.services.Service +import com.atlarge.opendc.core.services.ServiceProvider +import com.atlarge.opendc.core.services.resources.HostView +import java.util.UUID + +/** + * A cloud platform service that provisions the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +abstract class ProvisioningService : ServiceProvider { + override val provides: Set> = setOf(ProvisioningService) + + /** + * The service key of the provisioner service. + */ + companion object : AbstractService(UUID.randomUUID(), "provisioner") +} + +/** + * A message protocol for communicating to the resource provisioner. + */ +sealed class ProvisioningMessage { + /** + * Request the specified number of resources from the provisioner. + * + * @property numHosts The number of hosts to request from the provisioner. + * @property replyTo The actor to reply to. + */ + data class Request(val numHosts: Int, val replyTo: SendRef) : ProvisioningMessage() + + /** + * Release the specified resource [ProvisioningResponse.Lease]. + * + * @property lease The lease to release. + */ + data class Release(val lease: ProvisioningResponse.Lease) : ProvisioningMessage() +} + +/** + * A message protocol used by the resource provisioner to respond to [ProvisioningMessage]s. + */ +sealed class ProvisioningResponse { + /** + * A lease for the specified hosts. + */ + data class Lease(val hosts: List) : ProvisioningResponse() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/SimpleProvisioningService.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/SimpleProvisioningService.kt new file mode 100644 index 00000000..5f77e1a1 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/SimpleProvisioningService.kt @@ -0,0 +1,107 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services.provisioning + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.ask +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.ZoneMessage +import com.atlarge.opendc.core.find +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.services.Service +import com.atlarge.opendc.core.services.resources.HostView +import com.atlarge.opendc.core.services.resources.ResourceManagementMessage +import com.atlarge.opendc.core.services.resources.ResourceManagementResponse +import com.atlarge.opendc.core.services.resources.ResourceManagementService +import java.util.ArrayDeque +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive + +/** + * A cloud platform service that provisions the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +object SimpleProvisioningService : ProvisioningService() { + override val uid: UUID = UUID.randomUUID() + override val name: String = "simple-provisioner" + override val dependencies: Set> = setOf(ResourceManagementService) + + /** + * Build the runtime [Behavior] for the resource provisioner, responding to messages of shape [ProvisioningMessage]. + */ + override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef, main: Channel) { + val inlet = ctx.listen(main.receive) + val manager = zoneRef.find(ResourceManagementService) + + delay(10) + + val hosts = mutableMapOf() + val available = ArrayDeque() + val leases = mutableSetOf() + + // Subscribe to all machines in the zone + for (cluster in zone.clusters) { + for (host in cluster.hosts) { + val msg: ResourceManagementResponse.Listing = manager.ask { ResourceManagementMessage.Lookup(host, it) } + if (msg.instance != null) { + hosts[msg.instance.ref] = msg.instance + available.add(msg.instance) + } + } + } + + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is ProvisioningMessage.Request -> { + println("Provisioning ${msg.numHosts} hosts") + val leaseHosts = mutableListOf() + while (available.isNotEmpty() && leaseHosts.size < msg.numHosts) { + leaseHosts += available.poll() + } + val lease = ProvisioningResponse.Lease(leaseHosts) + leases += lease + msg.replyTo.sendOnce(lease) + } + is ProvisioningMessage.Release -> { + val lease = msg.lease + if (lease in leases) { + return@coroutineScope + } + available.addAll(lease.hosts) + leases -= lease + } + } + } + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/HostView.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/HostView.kt new file mode 100644 index 00000000..60dd2eb9 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/HostView.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services.resources + +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.resources.compute.MachineStatus +import com.atlarge.opendc.core.resources.compute.host.Host + +/** + * The dynamic information of a [Host] instance that is being tracked by the [ResourceManagementService]. This means + * that information may not be up-to-date. + * + * @property host The static information of the host. + * @property ref The reference to the host's actor. + * @property status The status of the machine. + */ +data class HostView(val host: Host, val ref: MachineRef, val status: MachineStatus = MachineStatus.HALT) { + override fun equals(other: Any?): Boolean = other is HostView && host.uid == other.host.uid + override fun hashCode(): Int = host.uid.hashCode() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/ResourceManagementService.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/ResourceManagementService.kt new file mode 100644 index 00000000..cc032952 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/ResourceManagementService.kt @@ -0,0 +1,120 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services.resources + +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.ZoneMessage +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.resources.compute.MachineStatus +import com.atlarge.opendc.core.resources.compute.host.Host +import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent +import com.atlarge.opendc.core.services.Service +import com.atlarge.opendc.core.services.ServiceProvider +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * A cloud platform service that manages the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +object ResourceManagementService : ServiceProvider, Service { + override val uid: UUID = UUID.randomUUID() + override val name: String = "resource-manager" + override val provides: Set> = setOf(ResourceManagementService) + override val dependencies: Set> = emptySet() + + /** + * Build the runtime behavior of the [ResourceManagementService]. + */ + override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef, main: Channel) { + // Launch the clusters of the zone + for (cluster in zone.clusters) { + ctx.spawn({ cluster(it, main.send) }, name = "${cluster.name}-${cluster.uid}") + } + + val hosts = mutableMapOf() + val inlet = ctx.listen(main.receive) + + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is MachineSupervisionEvent.Announce -> { + val host = msg.machine as? Host + if (host != null) { + hosts[msg.ref] = HostView(host, msg.ref) + } + } + is MachineSupervisionEvent.Up -> { + hosts.computeIfPresent(msg.ref) { _, value -> + value.copy(status = MachineStatus.RUNNING) + } + } + is ResourceManagementMessage.Lookup -> { + msg.replyTo.sendOnce(ResourceManagementResponse.Listing(hosts.values.find { it.host == msg.host })) + } + } + } + } + } +} + +/** + * A reference to the resource manager of a zone. + */ +typealias ResourceManagerRef = SendRef + +/** + * A message protocol for communicating to the resource manager. + */ +sealed class ResourceManagementMessage { + /** + * Lookup the specified [Host]. + * + * @property host The host to lookup. + * @property replyTo The address to sent the response to. + */ + data class Lookup( + val host: Host, + val replyTo: SendRef + ) : ResourceManagementMessage() +} + +/** + * A message protocol used by the resource manager to respond to [ResourceManagementMessage]s. + */ +sealed class ResourceManagementResponse { + /** + * A response to a [ResourceManagementMessage.Lookup] request. + * + * @property instance The instance that was found or `null` if it does not exist. + */ + data class Listing(val instance: HostView?) : ResourceManagementResponse() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/Workload.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/Workload.kt new file mode 100644 index 00000000..def5d6e4 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/Workload.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.workload + +import com.atlarge.opendc.core.Identity +import com.atlarge.opendc.core.User + +/** + * A high-level abstraction that represents the actual work that a set of compute resources perform, such + * as running an application on a machine or a whole workflow running multiple tasks on numerous machines. + */ +interface Workload : Identity { + /** + * The owner of this workload. + */ + val owner: User +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Application.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Application.kt new file mode 100644 index 00000000..d1ccd347 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Application.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.workload.application + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.opendc.core.workload.Workload + +/** + * A generic representation of a workload that can directly be executed by physical or virtual compute resources, + * such as a web server application. + */ +interface Application : Workload { + /** + * The number of processing elements required by the task. + */ + val cores: Int + + /** + * Build the runtime [Behavior] of an application, accepting messages of [ProcessMessage]. + * + * This is a model for the runtime behavior of an application instance (process) that describes how an application + * instance consumes the allocated resources on a machine. + */ + suspend operator fun invoke(ctx: ProcessContext, pid: Pid, main: ReceiveRef) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/FlopsApplication.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/FlopsApplication.kt new file mode 100644 index 00000000..a2dbacf1 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/FlopsApplication.kt @@ -0,0 +1,114 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.workload.application + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.User +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import java.util.UUID +import kotlin.math.ceil +import kotlin.math.min +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * An [Application] implementation that models applications performing a static number of floating point operations + * ([flops]) on a compute resource. + * + * @property uid A unique identifier for this application. + * @property name The name of the application. + * @property owner The owner of this application. + * @property cores The number of cores needed for this application. + * @property flops The number of floating point operations to perform for this task. + */ +class FlopsApplication( + override val uid: UUID, + override val name: String, + override val owner: User, + override val cores: Int, + val flops: Long +) : Application { + + init { + require(flops >= 0) { "Negative number of flops" } + } + + /** + * Build the runtime [Behavior] based on a number of floating point operations to execute. + */ + override suspend fun invoke(ctx: ProcessContext, pid: Pid, main: ReceiveRef) { + val inlet = ctx.listen(main) + var remaining = flops + var start: Long = 0 + lateinit var allocation: Map + + val created = inlet.receive() as ProcessMessage.Setup + val ref = created.ref + + ref.sendOnce(ProcessEvent.Ready(pid)) + + suspend fun processAllocation(resources: Map, until: Long) { + start = ctx.clock.millis() + allocation = resources + .asSequence() + .take(cores) + .associateBy({ it.key }, { it.value }) + + val speed = allocation.asSequence() + .map { (key, value) -> key.unit.clockRate * value } + .average() + val finishedAt = ceil(ctx.clock.millis() + remaining / speed).toLong() + ref.sendOnce(ProcessEvent.Consume(pid, allocation, min(finishedAt, until))) + } + + var msg = inlet.receive() as ProcessMessage.Allocation + processAllocation(msg.resources, msg.until) + + coroutineScope { + while (isActive) { + msg = inlet.receive() as ProcessMessage.Allocation + + /* Compute the consumption of flops */ + val consumed = allocation.asSequence() + .map { (key, value) -> key.unit.clockRate * value * (ctx.clock.millis() - start) } + .sum() + // Ceil to prevent consumed flops being rounded to 0 + remaining -= ceil(consumed).toLong() + + /* Test whether all flops have been consumed and the task is finished */ + if (remaining <= 0) { + ref.sendOnce(ProcessEvent.Exit(pid, 0)) + break + } + processAllocation(msg.resources, msg.until) + } + } + + inlet.close() + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Process.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Process.kt new file mode 100644 index 00000000..fc70b924 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Process.kt @@ -0,0 +1,90 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.workload.application + +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.ProcessingElement + +/** + * The process id (pid) is a reference to the application instance (process) that accepts messages of + * type [ProcessMessage]. + */ +typealias Pid = SendRef + +/** + * A message protocol for actors to communicate with task instances (called processes). + */ +sealed class ProcessMessage { + /** + * Indicate that the task should be installed to the specified machine. + * + * @property machine The machine to install the task. + * @property ref The reference to the machine instance. + */ + data class Setup(val machine: Machine, val ref: SendRef) : ProcessMessage() + + /** + * Indicate an allocation of compute resources on a machine for a certain duration. + * The task may assume that the reservation occurs after installation on the same machine. + * + * @property resources The cpu cores (and the utilization percentages) allocated for the task. + * @property until The point in time till which the reservation is valid. + */ + data class Allocation(val resources: Map, val until: Long) : ProcessMessage() +} + +/** + * The message protocol used by application instances respond to [ProcessMessage]s. + */ +sealed class ProcessEvent { + /** + * Indicate that the process is ready to start processing. + * + * @property pid A reference to the application instance. + */ + data class Ready(val pid: Pid) : ProcessEvent() + + /** + * Indicate the estimated resource utilization of the task until a specified point in time. + * + * @property pid A reference to the application instance of the represented utilization. + * @property utilization The utilization of the cpu cores as a percentage. + * @property until The point in time until which the utilization is valid. + */ + data class Consume( + val pid: Pid, + val utilization: Map, + val until: Long + ) : ProcessEvent() + + /** + * Indicate that a process has been terminated. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + data class Exit(val pid: Pid, val status: Int) : ProcessEvent() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/ProcessSupervisor.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/ProcessSupervisor.kt new file mode 100644 index 00000000..fefd6c88 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/ProcessSupervisor.kt @@ -0,0 +1,83 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.workload.application + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * An interface for supervising processes. + */ +interface ProcessSupervisor { + /** + * This method is invoked when the setup of an application completed successfully. + * + * @param pid The process id of the process that has been initialized. + */ + fun onReady(pid: Pid) {} + + /** + * This method is invoked when a process informs the machine that it is running with the + * estimated resource utilization until a specified point in time. + * + * @param pid The process id of the process that is running. + * @param utilization The utilization of the cpu cores as a percentage. + * @param until The point in time until which the utilization is valid. + */ + fun onConsume(pid: Pid, utilization: Map, until: Long) {} + + /** + * This method is invoked when a process exits. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + fun onExit(pid: Pid, status: Int) {} + + companion object { + /** + * Create the [Behavior] for a [ProcessSupervisor]. + * + * @param supervisor The supervisor to create the behavior for. + */ + suspend operator fun invoke(ctx: ProcessContext, supervisor: ProcessSupervisor, main: ReceiveRef) { + val inlet = ctx.listen(main) + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is ProcessEvent.Ready -> supervisor.onReady(msg.pid) + is ProcessEvent.Consume -> supervisor.onConsume(msg.pid, msg.utilization, msg.until) + is ProcessEvent.Exit -> supervisor.onExit(msg.pid, msg.status) + } + } + } + inlet.close() + } + } +} diff --git a/opendc/opendc-experiments-tpds/build.gradle.kts b/opendc/opendc-experiments-tpds/build.gradle.kts new file mode 100644 index 00000000..297e1d94 --- /dev/null +++ b/opendc/opendc-experiments-tpds/build.gradle.kts @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Experiments for the TPDS paper" + +/* Build configuration */ +plugins { + `kotlin-library-convention` + application +} + +application { + mainClassName = "com.atlarge.opendc.experiments.tpds.TestExperiment" +} + +dependencies { + api(project(":opendc:opendc-core")) + implementation(project(":opendc:opendc-format")) + implementation(project(":opendc:opendc-workflows")) + implementation(kotlin("stdlib")) + + runtimeOnly(project(":odcsim:odcsim-engine-omega")) + testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") + testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") +} diff --git a/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt b/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt new file mode 100644 index 00000000..ffd1604e --- /dev/null +++ b/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt @@ -0,0 +1,143 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.tpds + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.core.Broker +import com.atlarge.opendc.core.Model +import com.atlarge.opendc.core.PlatformMessage +import com.atlarge.opendc.core.find +import com.atlarge.opendc.core.services.provisioning.SimpleProvisioningService +import com.atlarge.opendc.core.services.resources.ResourceManagementService +import com.atlarge.opendc.core.zones +import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader +import com.atlarge.opendc.format.trace.gwf.GwfTraceReader +import com.atlarge.opendc.workflows.service.StageWorkflowScheduler +import com.atlarge.opendc.workflows.service.WorkflowEvent +import com.atlarge.opendc.workflows.service.WorkflowMessage +import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode +import com.atlarge.opendc.workflows.service.WorkflowService +import com.atlarge.opendc.workflows.service.stage.job.FifoJobSortingPolicy +import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy +import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy +import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceDynamicFilterPolicy +import com.atlarge.opendc.workflows.service.stage.task.FifoTaskSortingPolicy +import com.atlarge.opendc.workflows.service.stage.task.FunctionalTaskEligibilityPolicy +import com.atlarge.opendc.workflows.workload.Job +import java.io.File +import java.util.ServiceLoader +import kotlin.math.max +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking + +/** + * Main entry point of the experiment. + */ +fun main(args: Array) { + if (args.isEmpty()) { + println("error: Please provide path to GWF trace") + return + } + + val scheduler = StageWorkflowScheduler( + mode = WorkflowSchedulerMode.Batch(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobSortingPolicy = FifoJobSortingPolicy(), + taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), + taskSortingPolicy = FifoTaskSortingPolicy(), + resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), + resourceSelectionPolicy = FirstFitResourceSelectionPolicy() + ) + + val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) + .use { it.read() } + .let { env -> + env.copy(platforms = env.platforms.map { platform -> + platform.copy(zones = platform.zones.map { zone -> + val services = zone.services + setOf(ResourceManagementService, SimpleProvisioningService, WorkflowService(scheduler)) + zone.copy(services = services) + }) + }) + } + + val broker = object : Broker { + override suspend fun invoke(ctx: ProcessContext, platforms: List>) { + coroutineScope { + val zones = platforms.first().zones() + val service = zones.values.first().find(WorkflowService) + val activeJobs = mutableSetOf() + val channel = ctx.open() + val outlet = ctx.connect(service) + val inlet = ctx.listen(channel.receive) + + launch { + val reader = GwfTraceReader(File(args[0])) + + while (reader.hasNext() && isActive) { + val (time, job) = reader.next() + delay(max(0, time - ctx.clock.millis())) + outlet.send(WorkflowMessage.Submit(job, channel.send)) + } + } + + var total = 0 + var finished = 0 + + while (isActive) { + when (val msg = inlet.receive()) { + is WorkflowEvent.JobSubmitted -> { + println("Job ${msg.job.uid} submitted") + total += 1 + } + is WorkflowEvent.JobStarted -> { + activeJobs += msg.job + } + is WorkflowEvent.JobFinished -> { + activeJobs -= msg.job + finished += 1 + println("Jobs $finished/$total finished (${msg.job.tasks.size} tasks)") + if (activeJobs.isEmpty()) + return@coroutineScope + } + } + } + } + } + } + + val model = Model(environment, listOf(broker)) + val factory = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = factory({ model(it) }, name = "sim") + + runBlocking { + system.run() + system.terminate() + } +} diff --git a/opendc/opendc-experiments-tpds/src/main/resources/env/setup-test.json b/opendc/opendc-experiments-tpds/src/main/resources/env/setup-test.json new file mode 100644 index 00000000..0965b250 --- /dev/null +++ b/opendc/opendc-experiments-tpds/src/main/resources/env/setup-test.json @@ -0,0 +1,36 @@ +{ + "name": "Experimental Setup 2", + "rooms": [ + { + "type": "SERVER", + "objects": [ + { + "type": "RACK", + "machines": [ + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]} + ] + }, + { + "type": "RACK", + "machines": [ + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]} + ] + } + ] + } + ] +} diff --git a/opendc/opendc-format/build.gradle.kts b/opendc/opendc-format/build.gradle.kts new file mode 100644 index 00000000..5f9ac1ec --- /dev/null +++ b/opendc/opendc-format/build.gradle.kts @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Library for reading common data formats for datacenter simulation" + +/* Build configuration */ +plugins { + `kotlin-library-convention` +} + +dependencies { + api(project(":opendc:opendc-core")) + api(project(":opendc:opendc-workflows")) + api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") + implementation(kotlin("stdlib")) + implementation(kotlin("reflect")) + + testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") + testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") +} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt new file mode 100644 index 00000000..6ca53a05 --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment + +import com.atlarge.opendc.core.Environment +import java.io.Closeable + +/** + * An interface for reading descriptions of datacenter environments into memory as [Environment]. + */ +interface EnvironmentReader : Closeable { + /** + * Read the description of the datacenter environment as [Environment]. + */ + fun read(): Environment +} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt new file mode 100644 index 00000000..7afe9253 --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt @@ -0,0 +1,44 @@ +package com.atlarge.opendc.format.environment.sc18 + +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo + +/** + * A datacenter setup. + * + * @property name The name of the setup. + * @property rooms The rooms in the datacenter. + */ +internal data class Setup(val name: String, val rooms: List) + +/** + * A room in a datacenter. + * + * @property type The type of room in the datacenter. + * @property objects The objects in the room. + */ +internal data class Room(val type: String, val objects: List) + +/** + * An object in a [Room]. + * + * @property type The type of the room object. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) +internal sealed class RoomObject(val type: String) { + /** + * A rack in a server room. + * + * @property machines The machines in the rack. + */ + internal data class Rack(val machines: List) : RoomObject("RACK") +} + +/** + * A machine in the setup that consists of the specified CPU's represented as + * integer identifiers and ethernet speed. + * + * @property cpus The CPUs in the machine represented as integer identifiers. + */ +internal data class Machine(val cpus: List) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt new file mode 100644 index 00000000..ad111e74 --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -0,0 +1,95 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment.sc18 + +import com.atlarge.opendc.core.Cluster +import com.atlarge.opendc.core.Environment +import com.atlarge.opendc.core.Platform +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import com.atlarge.opendc.core.resources.compute.ProcessingUnit +import com.atlarge.opendc.core.resources.compute.host.Host +import com.atlarge.opendc.core.resources.compute.scheduling.SpaceSharedMachineScheduler +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import java.io.InputStream +import java.util.UUID + +/** + * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Datacenter + * Schedulers". + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { + /** + * The environment that was read from the file. + */ + private val environment: Environment + + init { + val setup = mapper.readValue(input) + val clusters = setup.rooms.mapIndexed { i, room -> + var counter = 0 + val hosts = room.objects.flatMap { roomObject -> + when (roomObject) { + is RoomObject.Rack -> { + roomObject.machines.map { machine -> + val cores = machine.cpus.flatMap { id -> + when (id) { + 1 -> List(4) { ProcessingElement(it, CPUS[0]) } + 2 -> List(2) { ProcessingElement(it, CPUS[1]) } + else -> throw IllegalArgumentException("The cpu id $id is not recognized") + } + } + Host(UUID.randomUUID(), "node-${counter++}", SpaceSharedMachineScheduler, cores) + } + } + } + } + Cluster(UUID.randomUUID(), "cluster-$i", hosts) + } + + val platform = Platform(UUID.randomUUID(), "sc18-platform", listOf( + Zone(UUID.randomUUID(), "zone", emptySet(), clusters) + )) + + environment = Environment(setup.name, null, listOf(platform)) + } + + override fun read(): Environment = environment + + override fun close() {} + + companion object { + val CPUS = arrayOf( + ProcessingUnit("Intel", 6, 6920, "Intel(R) Core(TM) i7-6920HQ CPU @ 4.10GHz", 4100.0, 1), + ProcessingUnit("Intel", 6, 6930, "Intel(R) Core(TM) i7-6920HQ CPU @ 3.50GHz", 3500.0, 1) + ) + } +} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt new file mode 100644 index 00000000..d4ad33f7 --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt @@ -0,0 +1,54 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.opendc.core.workload.Workload + +/** + * An entry in a workload trace. + * + * @param T The shape of the workload in this entry. + */ +interface TraceEntry { + /** + * The time of submission of the workload. + */ + val submissionTime: Long + + /** + * The workload in this trace entry. + */ + val workload: T + + /** + * Extract the submission time from this entry. + */ + operator fun component1() = submissionTime + + /** + * Extract the workload from this entry. + */ + operator fun component2() = workload +} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt new file mode 100644 index 00000000..6d29cdb4 --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.opendc.core.workload.Workload +import java.io.Closeable + +/** + * An interface for reading [Workload]s into memory. + * + * This interface must guarantee that the entries are delivered in order of submission time. + * + * @param T The shape of the workloads supported by this reader. + */ +interface TraceReader : Iterator>, Closeable diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt new file mode 100644 index 00000000..94ee6f31 --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.opendc.core.workload.Workload +import java.io.Closeable + +/** + * An interface for persisting workload traces (e.g. to disk). + * + * @param T The type of [Workload] supported by this writer. + */ +interface TraceWriter : Closeable { + /** + * Write an entry to the trace. + * + * Entries must be written in order of submission time. Failing to do so results in a [IllegalArgumentException]. + * + * @param submissionTime The time of submission of the workload. + * @param workload The workload to write to the trace. + */ + fun write(submissionTime: Long, workload: T) +} diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt new file mode 100644 index 00000000..407a5f4e --- /dev/null +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt @@ -0,0 +1,167 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.gwf + +import com.atlarge.opendc.core.User +import com.atlarge.opendc.core.workload.application.FlopsApplication +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task +import java.io.BufferedReader +import java.io.File +import java.io.InputStream +import java.util.UUID +import kotlin.math.max +import kotlin.math.min + +/** + * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more + * information about the format. + * + * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore + * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a + * different format for better performance. + * + * @param reader The buffered reader to read the trace with. + */ +class GwfTraceReader(reader: BufferedReader) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Create a [GwfTraceReader] instance from the specified [File]. + * + * @param file The file to read from. + */ + constructor(file: File) : this(file.bufferedReader()) + + /** + * Create a [GwfTraceReader] instance from the specified [InputStream]. + * + * @param input The input stream to read from. + */ + constructor(input: InputStream) : this(input.bufferedReader()) + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf() + val tasks = mutableMapOf() + val taskDependencies = mutableMapOf>() + + var workflowIdCol = 0 + var taskIdCol = 0 + var submitTimeCol = 0 + var runtimeCol = 0 + var coreCol = 0 + var dependencyCol = 0 + + try { + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(",") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + workflowIdCol = header["WorkflowID"]!! + taskIdCol = header["JobID"]!! + submitTimeCol = header["SubmitTime"]!! + runtimeCol = header["RunTime"]!! + coreCol = header["NProcs"]!! + dependencyCol = header["Dependencies"]!! + return@forEachIndexed + } + + val workflowId = values[workflowIdCol].trim().toLong() + val taskId = values[taskIdCol].trim().toLong() + val submitTime = values[submitTimeCol].trim().toLong() + val runtime = max(0, values[runtimeCol].trim().toLong()) + val cores = values[coreCol].trim().toInt() + val dependencies = values[dependencyCol].split(" ") + .filter { it.isNotEmpty() } + .map { it.trim().toLong() } + + val flops: Long = 4000 * runtime * cores + + val entry = entries.getOrPut(workflowId) { + TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "", UnnamedUser, HashSet())) + } + val workflow = entry.workload + val task = Task( + UUID(0L, taskId), "", + FlopsApplication(UUID(0L, taskId), "", workflow.owner, cores, flops), + HashSet() + ) + entry.submissionTime = min(entry.submissionTime, submitTime) + (workflow.tasks as MutableSet).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } + } finally { + reader.close() + } + + // Fix dependencies and dependents for all tasks + taskDependencies.forEach { (task, dependencies) -> + (task.dependencies as MutableSet).addAll(dependencies.map { taskId -> + tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") + }) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: Job + ) : TraceEntry +} diff --git a/opendc/opendc-workflows/build.gradle.kts b/opendc/opendc-workflows/build.gradle.kts new file mode 100644 index 00000000..6aa044e8 --- /dev/null +++ b/opendc/opendc-workflows/build.gradle.kts @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Workflow service for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-convention` +} + +dependencies { + api(project(":opendc:opendc-core")) + implementation(kotlin("stdlib")) + + testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") + testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt new file mode 100644 index 00000000..d4240421 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy +import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy +import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy +import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy +import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy +import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy +import kotlinx.coroutines.CoroutineScope + +/** + * A [WorkflowScheduler] that distributes work through a multi-stage process based on the Reference Architecture for + * Datacenter Scheduling. + */ +class StageWorkflowScheduler( + private val mode: WorkflowSchedulerMode, + private val jobAdmissionPolicy: JobAdmissionPolicy, + private val jobSortingPolicy: JobSortingPolicy, + private val taskEligibilityPolicy: TaskEligibilityPolicy, + private val taskSortingPolicy: TaskSortingPolicy, + private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, + private val resourceSelectionPolicy: ResourceSelectionPolicy +) : WorkflowScheduler { + override fun invoke( + ctx: ProcessContext, + self: WorkflowServiceRef, + coroutineScope: CoroutineScope, + lease: ProvisioningResponse.Lease + ): WorkflowSchedulerLogic { + return StageWorkflowSchedulerLogic(ctx, self, coroutineScope, lease, mode, jobAdmissionPolicy, + jobSortingPolicy, taskEligibilityPolicy, taskSortingPolicy, resourceDynamicFilterPolicy, resourceSelectionPolicy) + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt new file mode 100644 index 00000000..c6162f5e --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt @@ -0,0 +1,275 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendPort +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.resources.compute.MachineEvent +import com.atlarge.opendc.core.resources.compute.MachineMessage +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.resources.compute.scheduling.ProcessObserver +import com.atlarge.opendc.core.resources.compute.scheduling.ProcessState +import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.core.services.resources.HostView +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.Pid +import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy +import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy +import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy +import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy +import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy +import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +/** + * Logic of the [StageWorkflowScheduler]. + */ +class StageWorkflowSchedulerLogic( + ctx: ProcessContext, + self: WorkflowServiceRef, + coroutineScope: CoroutineScope, + lease: ProvisioningResponse.Lease, + private val mode: WorkflowSchedulerMode, + private val jobAdmissionPolicy: JobAdmissionPolicy, + private val jobSortingPolicy: JobSortingPolicy, + private val taskEligibilityPolicy: TaskEligibilityPolicy, + private val taskSortingPolicy: TaskSortingPolicy, + private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, + private val resourceSelectionPolicy: ResourceSelectionPolicy +) : WorkflowSchedulerLogic(ctx, self, coroutineScope, lease) { + + /** + * The incoming jobs ready to be processed by the scheduler. + */ + internal val incomingJobs: MutableSet = mutableSetOf() + + /** + * The active jobs in the system. + */ + internal val activeJobs: MutableSet = mutableSetOf() + + /** + * The running tasks by [Pid]. + */ + internal val taskByPid = mutableMapOf() + + /** + * The available processor cores on the leased machines. + */ + internal val machineCores: MutableMap = HashMap() + + private val brokers: MutableMap, SendPort> = HashMap() + private val channel = ctx.open() + + init { + lease.hosts.forEach { machineCores[it] = it.host.cores.count() } + coroutineScope.launch { + ProcessObserver(ctx, this@StageWorkflowSchedulerLogic, channel.receive) + } + } + + override suspend fun submit(job: Job, handler: SendRef) { + val broker = brokers.getOrPut(handler) { ctx.connect(handler) } + + // J1 Incoming Jobs + val jobInstance = JobView(job, handler) + val instances = job.tasks.associateWith { + TaskView(jobInstance, it) + } + + for ((task, instance) in instances) { + instance.dependencies.addAll(task.dependencies.map { instances[it]!! }) + task.dependencies.forEach { + instances[it]!!.dependents.add(instance) + } + + // If the task has no dependency, it is a root task and can immediately be evaluated + if (instance.isRoot) { + instance.state = ProcessState.READY + } + } + + jobInstance.tasks = instances.values.toMutableSet() + incomingJobs += jobInstance + broker.send(WorkflowEvent.JobSubmitted(self, job, ctx.clock.millis())) + requestCycle() + } + + private var next: kotlinx.coroutines.Job? = null + + /** + * Indicate to the scheduler that a scheduling cycle is needed. + */ + private fun requestCycle() { + when (mode) { + is WorkflowSchedulerMode.Interactive -> { + coroutineScope.launch { + schedule() + } + } + is WorkflowSchedulerMode.Batch -> { + if (next == null) { + val job = coroutineScope.launch { + delay(mode.quantum) + schedule() + } + next = job + job.invokeOnCompletion { + next = null + } + } + } + } + } + + /** + * Perform a scheduling cycle immediately. + */ + override suspend fun schedule() { + // J2 Create list of eligible jobs + jobAdmissionPolicy.startCycle(this) + val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) } + for (jobInstance in eligibleJobs) { + incomingJobs -= jobInstance + activeJobs += jobInstance + brokers.getValue(jobInstance.broker).send(WorkflowEvent.JobStarted(self, jobInstance.job, ctx.clock.millis())) + } + + // J3 Sort jobs on criterion + val sortedJobs = jobSortingPolicy(this, activeJobs) + + // J4 Per job + for (jobInstance in sortedJobs) { + // T1 Create list of eligible tasks + taskEligibilityPolicy.startCycle(this) + val eligibleTasks = jobInstance.tasks.filter { taskEligibilityPolicy.isEligible(this, it) } + + // T2 Sort tasks on criterion + val sortedTasks = taskSortingPolicy(this, eligibleTasks) + + // T3 Per task + for (instance in sortedTasks) { + val hosts = resourceDynamicFilterPolicy(this, lease.hosts, instance) + val host = resourceSelectionPolicy.select(this, hosts, instance) + + if (host != null) { + // T4 Submit task to machine + host.ref.sendOnce(MachineMessage.Submit(instance.task.application, instance, channel.send)) + instance.host = host + instance.state = ProcessState.RUNNING // Assume the application starts running + machineCores.merge(host, instance.task.application.cores, Int::minus) + } else { + return + } + } + } + } + + override fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) { + val task = key as TaskView + task.pid = pid + taskByPid[pid] = task + + brokers.getValue(task.job.broker).send(WorkflowEvent.TaskStarted(self, task.job.job, task.task, ctx.clock.millis())) + } + + override fun onTermination(instance: MachineRef, pid: Pid, status: Int) { + val task = taskByPid.remove(pid) ?: throw IllegalStateException() + + val job = task.job + task.state = ProcessState.TERMINATED + job.tasks.remove(task) + machineCores.merge(task.host!!, task.task.application.cores, Int::plus) + brokers.getValue(job.broker).send(WorkflowEvent.TaskFinished(self, job.job, task.task, status, ctx.clock.millis())) + + if (job.isFinished) { + activeJobs -= job + brokers.getValue(job.broker).send(WorkflowEvent.JobFinished(self, job.job, ctx.clock.millis())) + } + + requestCycle() + } + + class JobView(val job: Job, val broker: SendRef) { + /** + * A flag to indicate whether this job is finished. + */ + val isFinished: Boolean + get() = tasks.isEmpty() + + lateinit var tasks: MutableSet + } + + class TaskView(val job: JobView, val task: Task) { + /** + * The dependencies of this task. + */ + val dependencies = HashSet() + + /** + * The dependents of this task. + */ + val dependents = HashSet() + + /** + * A flag to indicate whether this workflow task instance is a workflow root. + */ + val isRoot: Boolean + get() = dependencies.isEmpty() + + var state: ProcessState = ProcessState.CREATED + set(value) { + field = value + + // Mark the process as terminated in the graph + if (value == ProcessState.TERMINATED) { + markTerminated() + } + } + + var pid: Pid? = null + + var host: HostView? = null + + /** + * Mark the specified [TaskView] as terminated. + */ + private fun markTerminated() { + for (dependent in dependents) { + dependent.dependencies.remove(this) + + if (dependent.isRoot) { + dependent.state = ProcessState.READY + } + } + } + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt new file mode 100644 index 00000000..6d6d4179 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse +import kotlinx.coroutines.CoroutineScope + +/** + * A factory interface for constructing a [WorkflowSchedulerLogic]. + */ +interface WorkflowScheduler { + /** + * Construct a [WorkflowSchedulerLogic] in the given [ActorContext]. + * + * @param ctx The context in which the scheduler runs. + * @param timers The timer scheduler to use. + * @param lease The resource lease to use. + */ + operator fun invoke( + ctx: ProcessContext, + self: WorkflowServiceRef, + coroutineScope: CoroutineScope, + lease: ProvisioningResponse.Lease + ): WorkflowSchedulerLogic +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt new file mode 100644 index 00000000..0b3ba828 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt @@ -0,0 +1,56 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.resources.compute.scheduling.ProcessObserver +import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.CoroutineScope + +/** + * A workflow scheduler interface that schedules jobs across machines. + * + * @property ctx The context in which the scheduler runs. + * @property timers The timer scheduler to use. + * @property lease The resource lease to use. + */ +abstract class WorkflowSchedulerLogic( + protected val ctx: ProcessContext, + protected val self: WorkflowServiceRef, + protected val coroutineScope: CoroutineScope, + protected val lease: ProvisioningResponse.Lease +) : ProcessObserver { + /** + * Submit the specified workflow for scheduling. + */ + abstract suspend fun submit(job: Job, handler: SendRef) + + /** + * Trigger an immediate scheduling cycle. + */ + abstract suspend fun schedule() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt new file mode 100644 index 00000000..f5060c5c --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +/** + * The operating mode of a workflow scheduler. + */ +sealed class WorkflowSchedulerMode { + /** + * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. + */ + object Interactive : WorkflowSchedulerMode() + + /** + * A batch scheduler triggers a scheduling cycle every time quantum if needed. + */ + data class Batch(val quantum: Long) : WorkflowSchedulerMode() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt new file mode 100644 index 00000000..bed6b93b --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -0,0 +1,184 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.ask +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.ZoneMessage +import com.atlarge.opendc.core.find +import com.atlarge.opendc.core.resources.compute.MachineEvent +import com.atlarge.opendc.core.services.AbstractService +import com.atlarge.opendc.core.services.Service +import com.atlarge.opendc.core.services.ServiceProvider +import com.atlarge.opendc.core.services.provisioning.ProvisioningMessage +import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.core.services.provisioning.ProvisioningService +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive + +/** + * A service for cloud workflow management. + * + * The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al. + */ +class WorkflowService(val scheduler: WorkflowScheduler) : ServiceProvider { + override val uid: UUID = UUID.randomUUID() + override val name: String = "workflows" + override val provides: Set> = setOf(WorkflowService) + override val dependencies: Set> = setOf(ProvisioningService) + + /** + * Build the runtime [Behavior] for the workflow service, responding to messages of shape [WorkflowMessage]. + */ + override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef, main: Channel) { + coroutineScope { + val inlet = ctx.listen(main.receive) + val provisioner = zoneRef.find(ProvisioningService) + // Wait for 0.1 sec before asking the provisioner to allow it to initialize. Will return empty response if asked + // immediately. + delay(10) + val lease: ProvisioningResponse.Lease = provisioner.ask { ProvisioningMessage.Request(Int.MAX_VALUE, it) } + val schedulerLogic = scheduler(ctx, main.send, this, lease) + + while (isActive) { + when (val msg = inlet.receive()) { + is WorkflowMessage.Submit -> { + schedulerLogic.submit(msg.job, msg.broker) + } + is MachineEvent.Submitted -> { + schedulerLogic.onSubmission(msg.instance, msg.application, msg.key, msg.pid) + } + is MachineEvent.Terminated -> { + schedulerLogic.onTermination(msg.instance, msg.pid, msg.status) + } + } + } + } + } + + companion object : AbstractService(UUID.randomUUID(), "workflows") +} + +/** + * A reference to the workflow service instance. + */ +typealias WorkflowServiceRef = SendRef + +/** + * A message protocol for communicating to the workflow service. + */ +sealed class WorkflowMessage { + /** + * Submit the specified [Job] to the workflow service for scheduling. + * + * @property job The workflow to submit for scheduling. + * @property broker The broker that has submitted this workflow on behalf of a user and that needs to be kept + * up-to-date. + */ + data class Submit(val job: Job, val broker: SendRef) : WorkflowMessage() +} + +/** + * A message protocol used by the workflow service to respond to [WorkflowMessage]s. + */ +sealed class WorkflowEvent { + /** + * Indicate that the specified [Job] was submitted to the workflow service. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that has been submitted. + * @property time A timestamp of the moment the job was received. + */ + data class JobSubmitted( + val service: WorkflowServiceRef, + val job: Job, + val time: Long + ) : WorkflowEvent() + + /** + * Indicate that the specified [Job] has become active. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that has been submitted. + * @property time A timestamp of the moment the job started. + */ + data class JobStarted( + val service: WorkflowServiceRef, + val job: Job, + val time: Long + ) : WorkflowEvent() + + /** + * Indicate that the specified [Task] has started processing. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that contains this task. + * @property task The task that has started processing. + * @property time A timestamp of the moment the task started. + */ + data class TaskStarted( + val service: WorkflowServiceRef, + val job: Job, + val task: Task, + val time: Long + ) : WorkflowEvent() + + /** + * Indicate that the specified [Task] has started processing. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that contains this task. + * @property task The task that has started processing. + * @property status The exit code of the task, where zero means successful. + * @property time A timestamp of the moment the task finished. + */ + data class TaskFinished( + val service: WorkflowServiceRef, + val job: Job, + val task: Task, + val status: Int, + val time: Long + ) : WorkflowEvent() + + /** + * Indicate that the specified [Job] has finished processing. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that has finished processing. + * @property time A timestamp of the moment the task finished. + */ + data class JobFinished( + val service: WorkflowServiceRef, + val job: Job, + val time: Long + ) : WorkflowEvent() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt new file mode 100644 index 00000000..333ed35a --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue. + */ +class FifoJobSortingPolicy : JobSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + jobs: Collection + ): List = jobs.toList() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt new file mode 100644 index 00000000..d3a5d9a6 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * A policy interface for admitting [StageWorkflowSchedulerLogic.JobView]s to a scheduling cycle. + */ +interface JobAdmissionPolicy { + /** + * A method that is invoked at the start of each scheduling cycle. + * + * @param scheduler The scheduler that started the cycle. + */ + fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + + /** + * Determine whether the specified [StageWorkflowSchedulerLogic.JobView] should be admitted to the scheduling cycle. + * + * @param scheduler The scheduler that should admit or reject the job. + * @param job The workflow that has been submitted. + * @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise. + */ + fun shouldAdmit(scheduler: StageWorkflowSchedulerLogic, job: StageWorkflowSchedulerLogic.JobView): Boolean +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt new file mode 100644 index 00000000..ada3e693 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * A policy interface for ordering admitted workflows in the scheduling queue. + */ +interface JobSortingPolicy { + /** + * Sort the given collection of jobs on a given criterion. + * + * @param scheduler The scheduler that started the cycle. + * @param jobs The collection of tasks that should be sorted. + * @return The sorted list of jobs. + */ + operator fun invoke( + scheduler: StageWorkflowSchedulerLogic, + jobs: Collection + ): List +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt new file mode 100644 index 00000000..f877403b --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * A [JobAdmissionPolicy] that admits all jobs. + */ +object NullJobAdmissionPolicy : JobAdmissionPolicy { + /** + * Admit every submitted job. + */ + override fun shouldAdmit( + scheduler: StageWorkflowSchedulerLogic, + job: StageWorkflowSchedulerLogic.JobView + ): Boolean = true +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt new file mode 100644 index 00000000..30d5c456 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import kotlin.random.Random + +/** + * The [RandomJobSortingPolicy] sorts tasks randomly. + * + * @property random The [Random] instance to use when sorting the list of tasks. + */ +class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + jobs: Collection + ): List = jobs.shuffled(random) +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt new file mode 100644 index 00000000..c3307063 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.core.services.resources.HostView +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * A [ResourceSelectionPolicy] that selects the first machine that is available. + */ +class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { + override fun select( + scheduler: StageWorkflowSchedulerLogic, + machines: List, + task: StageWorkflowSchedulerLogic.TaskView + ): HostView? = + machines.firstOrNull() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt new file mode 100644 index 00000000..d742f842 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.core.services.resources.HostView +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for + * the task. + */ +class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + machines: List, + task: StageWorkflowSchedulerLogic.TaskView + ): List { + return machines + .filter { scheduler.machineCores[it] ?: 0 >= task.task.application.cores } + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt new file mode 100644 index 00000000..8a3b5a1e --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.core.services.resources.HostView +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding + * a list of resources with sufficient resource-capacities, based on fixed or dynamic requirements, and on predicted or + * monitored information about processing unit availability, memory occupancy, etc. + */ +interface ResourceDynamicFilterPolicy { + /** + * Filter the list of machines based on dynamic information. + * + * @param scheduler The scheduler to filter the machines. + * @param machines The list of machines in the system. + * @param task The task that is to be scheduled. + * @return The machines on which the task can be scheduled. + */ + operator fun invoke( + scheduler: StageWorkflowSchedulerLogic, + machines: List, + task: StageWorkflowSchedulerLogic.TaskView + ): List +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt new file mode 100644 index 00000000..90b2873c --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.core.services.resources.HostView +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected + * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit. + */ +interface ResourceSelectionPolicy { + /** + * Select a machine on which the task should be scheduled. + * + * @param scheduler The scheduler to select the machine. + * @param machines The list of machines in the system. + * @param task The task that is to be scheduled. + * @return The selected machine or `null` if no machine could be found. + */ + fun select( + scheduler: StageWorkflowSchedulerLogic, + machines: List, + task: StageWorkflowSchedulerLogic.TaskView + ): HostView? +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt new file mode 100644 index 00000000..48a1a50d --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue. + */ +class FifoTaskSortingPolicy : TaskSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + tasks: Collection + ): List = tasks.toList() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt new file mode 100644 index 00000000..1672633e --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.core.resources.compute.scheduling.ProcessState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job. + */ +class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy { + override fun isEligible( + scheduler: StageWorkflowSchedulerLogic, + task: StageWorkflowSchedulerLogic.TaskView + ): Boolean = task.state == ProcessState.READY +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt new file mode 100644 index 00000000..36ef3a50 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import kotlin.random.Random + +/** + * The [RandomTaskSortingPolicy] sorts tasks randomly. + * + * @property random The [Random] instance to use when sorting the list of tasks. + */ +class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + tasks: Collection + ): List = tasks.shuffled(random) +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt new file mode 100644 index 00000000..19f0240b --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * A policy interface for determining the eligibility of tasks in a scheduling cycle. + */ +interface TaskEligibilityPolicy { + /** + * A method that is invoked at the start of each scheduling cycle. + * + * @param scheduler The scheduler that started the cycle. + */ + fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + + /** + * Determine whether the specified [StageWorkflowSchedulerLogic.TaskView] is eligible to be scheduled. + * + * @param scheduler The scheduler that is determining whether the task is eligible. + * @param task The task instance to schedule. + * @return `true` if the task eligible to be scheduled, `false` otherwise. + */ + fun isEligible(scheduler: StageWorkflowSchedulerLogic, task: StageWorkflowSchedulerLogic.TaskView): Boolean +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt new file mode 100644 index 00000000..6a65ed69 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic + +/** + * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the + * scheduler with a sorted list of tasks to schedule. + */ +interface TaskSortingPolicy { + /** + * Sort the given list of tasks on a given criterion. + * + * @param scheduler The scheduler that is sorting the tasks. + * @param tasks The collection of tasks that should be sorted. + * @return The sorted list of tasks. + */ + operator fun invoke( + scheduler: StageWorkflowSchedulerLogic, + tasks: Collection + ): List +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt new file mode 100644 index 00000000..dece875c --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.workload + +import com.atlarge.opendc.core.User +import com.atlarge.opendc.core.workload.Workload +import java.util.UUID + +/** + * A workload that represents a directed acyclic graph (DAG) of tasks with control and data dependencies between tasks. + * + * @property uid A unique identified of this workflow. + * @property name The name of this workflow. + * @property owner The owner of the workflow. + * @property tasks The tasks that are part of this workflow. + */ +data class Job( + override val uid: UUID, + override val name: String, + override val owner: User, + val tasks: Set +) : Workload { + override fun equals(other: Any?): Boolean = other is Job && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt new file mode 100644 index 00000000..25fe7348 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.workload + +import com.atlarge.opendc.core.Identity +import com.atlarge.opendc.core.workload.application.Application +import java.util.UUID + +/** + * A stage of a [Job]. + * + * @property uid A unique identified of this task. + * @property name The name of this task. + * @property application The application to run as part of this workflow task. + * @property dependencies The dependencies of this task in order for it to execute. + */ +data class Task( + override val uid: UUID, + override val name: String, + val application: Application, + val dependencies: Set +) : Identity { + override fun equals(other: Any?): Boolean = other is Task && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 72d85c80..3f10026a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -25,3 +25,7 @@ rootProject.name = "opendc-simulator" include(":odcsim:odcsim-api") include(":odcsim:odcsim-engine-omega") +include(":opendc:opendc-core") +include(":opendc:opendc-format") +include(":opendc:opendc-workflows") +include(":opendc:opendc-experiments-tpds") -- cgit v1.2.3 From f5913d767d17d1dc676fa95174f5d95d0ea12ade Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 11 Feb 2020 14:16:29 +0100 Subject: Update Environment.kt Add missing dot --- .../opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt index 5bdff0b6..62309bf9 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt @@ -30,7 +30,7 @@ package com.atlarge.opendc.core * scheduled maintenance, allocation and other constraints. * * @property name The name of the environment. - * @property description A small textual description about the environment that is being modeled + * @property description A small textual description about the environment that is being modeled. * @property platforms The cloud platforms (such as AWS or GCE) in this environment. */ data class Environment(val name: String, val description: String?, val platforms: List) -- cgit v1.2.3 From 56ff9a31c59f271fb5f40bb9d3bed9a6d5b48a6f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Feb 2020 14:23:00 +0100 Subject: docs: Elaborate on unique ids of events --- .../kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 767e139a..11dae528 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -90,6 +90,11 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ private val channels: MutableSet> = HashSet() + /** + * A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence + * (but at the same time) may be differently ordered in the internal priority queue (queue) since it does not + * guarantee insertion order. + */ private var nextId: Long = 0 /** -- cgit v1.2.3 From 8e16b076e9c7c8c086446853e48dfff80cb45ca1 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 11 Feb 2020 14:26:30 +0100 Subject: Update Zone.kt Add missing dots --- opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt index 07361423..6c3ffd02 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt @@ -44,7 +44,7 @@ import kotlinx.coroutines.isActive * This class models *only* the static information of a zone, with dynamic information being contained within the zone's * actor. During runtime, it's actor acts as a registry for all the cloud services provided by the zone. * - * @property uid The unique identifier of this availability zone + * @property uid The unique identifier of this availability zone. * @property name The name of the zone within its platform. * @property services The initial set of services provided by the zone. * @property clusters The clusters of machines in this zone. @@ -123,7 +123,7 @@ data class Zone( } /** - * A message protocol for communicating with the service registry + * A message protocol for communicating with the service registry. */ sealed class ZoneMessage { /** -- cgit v1.2.3