From 7c8a3fd217418c6f956a9315eb13c2a31a9f85a0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 May 2019 22:37:35 +0200 Subject: feat: Add initial version of OpenDC simulation model This change adds the initial version of the port of the OpenDC simulation model to version 2.x of the simulator. The simulation model has been reworked to support immutability and event-driven simulation, with speed-ups up to 75x. --- opendc-core/build.gradle.kts | 51 +++++ .../main/kotlin/com/atlarge/opendc/model/Broker.kt | 40 ++++ .../kotlin/com/atlarge/opendc/model/Cluster.kt | 56 +++++ .../kotlin/com/atlarge/opendc/model/Environment.kt | 36 +++ .../kotlin/com/atlarge/opendc/model/Identity.kt | 42 ++++ .../main/kotlin/com/atlarge/opendc/model/Model.kt | 71 ++++++ .../kotlin/com/atlarge/opendc/model/Platform.kt | 103 +++++++++ .../main/kotlin/com/atlarge/opendc/model/User.kt | 35 +++ .../main/kotlin/com/atlarge/opendc/model/Zone.kt | 212 ++++++++++++++++++ .../opendc/model/resources/compute/Machine.kt | 105 +++++++++ .../model/resources/compute/MachineStatus.kt | 33 +++ .../model/resources/compute/ProcessingElement.kt | 33 +++ .../model/resources/compute/ProcessingUnit.kt | 44 ++++ .../opendc/model/resources/compute/host/Host.kt | 86 +++++++ .../compute/scheduling/MachineScheduler.kt | 48 ++++ .../compute/scheduling/MachineSchedulerLogic.kt | 64 ++++++ .../compute/scheduling/ProcessObserver.kt | 68 ++++++ .../resources/compute/scheduling/ProcessState.kt | 50 +++++ .../resources/compute/scheduling/ProcessView.kt | 53 +++++ .../scheduling/SpaceSharedMachineScheduler.kt | 178 +++++++++++++++ .../compute/supervision/MachineSupervisionEvent.kt | 49 ++++ .../compute/supervision/MachineSupervisor.kt | 67 ++++++ .../com/atlarge/opendc/model/services/Service.kt | 47 ++++ .../atlarge/opendc/model/services/ServiceMap.kt | 49 ++++ .../opendc/model/services/ServiceProvider.kt | 64 ++++++ .../services/provisioning/ProvisioningService.kt | 77 +++++++ .../provisioning/SimpleProvisioningService.kt | 125 +++++++++++ .../opendc/model/services/resources/HostView.kt | 42 ++++ .../resources/ResourceManagementService.kt | 121 ++++++++++ .../com/atlarge/opendc/model/workload/Workload.kt | 39 ++++ .../model/workload/application/Application.kt | 47 ++++ .../model/workload/application/FlopsApplication.kt | 164 ++++++++++++++ .../opendc/model/workload/application/Process.kt | 91 ++++++++ .../workload/application/ProcessSupervisor.kt | 79 +++++++ .../workload/application/FlopsApplicationTest.kt | 128 +++++++++++ opendc-experiments-tpds/build.gradle.kts | 50 +++++ .../opendc/experiments/tpds/TestExperiment.kt | 148 ++++++++++++ .../src/main/resources/env/setup-test.json | 36 +++ .../src/main/resources/log4j2.xml | 52 +++++ opendc-format-gwf/build.gradle.kts | 48 ++++ .../opendc/format/trace/gwf/GwfTraceReader.kt | 168 ++++++++++++++ .../opendc/format/trace/gwf/GwfTraceReaderTest.kt | 41 ++++ opendc-format-gwf/src/test/resources/trace.gwf | 4 + opendc-format-sc18/build.gradle.kts | 50 +++++ .../opendc/format/environment/sc18/Model.kt | 44 ++++ .../environment/sc18/Sc18EnvironmentReader.kt | 95 ++++++++ opendc-format/build.gradle.kts | 51 +++++ .../opendc/format/environment/EnvironmentReader.kt | 38 ++++ .../com/atlarge/opendc/format/trace/TraceEntry.kt | 55 +++++ .../com/atlarge/opendc/format/trace/TraceReader.kt | 37 +++ .../com/atlarge/opendc/format/trace/TraceWriter.kt | 46 ++++ opendc-workflows/build.gradle.kts | 51 +++++ .../services/workflows/StageWorkflowScheduler.kt | 58 +++++ .../workflows/StageWorkflowSchedulerLogic.kt | 248 +++++++++++++++++++++ .../model/services/workflows/WorkflowScheduler.kt | 47 ++++ .../services/workflows/WorkflowSchedulerLogic.kt | 55 +++++ .../services/workflows/WorkflowSchedulerMode.kt | 42 ++++ .../model/services/workflows/WorkflowService.kt | 193 ++++++++++++++++ .../workflows/stages/job/FifoJobSortingPolicy.kt | 37 +++ .../workflows/stages/job/JobAdmissionPolicy.kt | 48 ++++ .../workflows/stages/job/JobSortingPolicy.kt | 44 ++++ .../workflows/stages/job/NullJobAdmissionPolicy.kt | 40 ++++ .../workflows/stages/job/RandomJobSortingPolicy.kt | 40 ++++ .../resources/FirstFitResourceSelectionPolicy.kt | 40 ++++ .../FunctionalResourceDynamicFilterPolicy.kt | 43 ++++ .../resources/ResourceDynamicFilterPolicy.kt | 49 ++++ .../stages/resources/ResourceSelectionPolicy.kt | 48 ++++ .../workflows/stages/task/FifoTaskSortingPolicy.kt | 37 +++ .../stages/task/FunctionalTaskEligibilityPolicy.kt | 38 ++++ .../stages/task/RandomTaskSortingPolicy.kt | 40 ++++ .../workflows/stages/task/TaskEligibilityPolicy.kt | 48 ++++ .../workflows/stages/task/TaskSortingPolicy.kt | 45 ++++ .../atlarge/opendc/model/workload/workflow/Job.kt | 48 ++++ .../atlarge/opendc/model/workload/workflow/Task.kt | 48 ++++ settings.gradle.kts | 6 + 75 files changed, 4953 insertions(+) create mode 100644 opendc-core/build.gradle.kts create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/Broker.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/Cluster.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/Environment.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/Identity.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/Model.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/Platform.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/User.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/Zone.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/Machine.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/MachineStatus.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingElement.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingUnit.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/host/Host.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineScheduler.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineSchedulerLogic.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessObserver.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessState.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessView.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/SpaceSharedMachineScheduler.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisionEvent.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisor.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/Service.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceMap.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceProvider.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/ProvisioningService.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/SimpleProvisioningService.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/HostView.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/ResourceManagementService.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/Workload.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Application.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplication.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Process.kt create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/ProcessSupervisor.kt create mode 100644 opendc-core/src/test/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplicationTest.kt create mode 100644 opendc-experiments-tpds/build.gradle.kts create mode 100644 opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt create mode 100644 opendc-experiments-tpds/src/main/resources/env/setup-test.json create mode 100644 opendc-experiments-tpds/src/main/resources/log4j2.xml create mode 100644 opendc-format-gwf/build.gradle.kts create mode 100644 opendc-format-gwf/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt create mode 100644 opendc-format-gwf/src/test/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReaderTest.kt create mode 100644 opendc-format-gwf/src/test/resources/trace.gwf create mode 100644 opendc-format-sc18/build.gradle.kts create mode 100644 opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt create mode 100644 opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt create mode 100644 opendc-format/build.gradle.kts create mode 100644 opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt create mode 100644 opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt create mode 100644 opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt create mode 100644 opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt create mode 100644 opendc-workflows/build.gradle.kts create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt create mode 100644 opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt diff --git a/opendc-core/build.gradle.kts b/opendc-core/build.gradle.kts new file mode 100644 index 00000000..c89e27e6 --- /dev/null +++ b/opendc-core/build.gradle.kts @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` +} + +/* Project configuration */ +repositories { + jcenter() +} + +val junitJupiterVersion: String by extra +val junitPlatformVersion: String by extra + +dependencies { + api(project(":odcsim-core")) + + implementation(kotlin("stdlib")) + + testImplementation(project(":odcsim-testkit")) + testImplementation(project(":odcsim-engine-omega")) + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion") + testImplementation("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") + testRuntimeOnly("org.slf4j:slf4j-simple:1.7.25") + testImplementation("com.nhaarman.mockitokotlin2:mockito-kotlin:2.0.0") +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Broker.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Broker.kt new file mode 100644 index 00000000..0e2adb40 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Broker.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import com.atlarge.odcsim.Behavior + +/** + * A broker acting on the various cloud platforms on behalf of the user. + */ +interface Broker { + /** + * Build the runtime behavior of the [Broker]. + * + * @param platforms A list of available cloud platforms. + * @return The runtime behavior of the broker. + */ + operator fun invoke(platforms: List): Behavior<*> +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Cluster.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Cluster.kt new file mode 100644 index 00000000..1ed11e87 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Cluster.kt @@ -0,0 +1,56 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.empty +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.unsafeCast +import com.atlarge.opendc.model.resources.compute.host.Host +import com.atlarge.opendc.model.services.resources.ResourceManagerRef +import java.util.UUID + +/** + * A logical grouping of heterogeneous hosts and primary storage within a zone. + * + * @property uid The unique identifier of the cluster. + * @property name The name of this cluster. + * @property hosts The machines in this cluster. + */ +data class Cluster(override val uid: UUID, override val name: String, val hosts: List) : Identity { + /** + * Build the runtime [Behavior] of this cluster of hosts. + * + * @param manager The manager of the cluster. + */ + operator fun invoke(manager: ResourceManagerRef): Behavior = setup { ctx -> + // Launch all hosts in the cluster + for (host in hosts) { + ctx.spawn(host(manager.unsafeCast()), name = host.name) + } + + empty() + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Environment.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Environment.kt new file mode 100644 index 00000000..b7fe33db --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Environment.kt @@ -0,0 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +/** + * A description of a large-scale computing environment. This description includes including key size and topology + * information of the environment, types of resources, but also various operational and management rules such as + * scheduled maintenance, allocation and other constraints. + * + * @property name The name of the environment. + * @property description A small textual description about the environment that is being modeled + * @property platforms The cloud platforms (such as AWS or GCE) in this environment. + */ +data class Environment(val name: String, val description: String?, val platforms: List) diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Identity.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Identity.kt new file mode 100644 index 00000000..2455d138 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Identity.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import java.util.UUID + +/** + * An object that has a unique identity. + */ +interface Identity { + /** + * A unique, opaque, system-generated value, representing the object. + */ + val uid: UUID + + /** + * A non-empty, human-readable string representing the object. + */ + val name: String +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Model.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Model.kt new file mode 100644 index 00000000..098bfbde --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Model.kt @@ -0,0 +1,71 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Terminated +import com.atlarge.odcsim.receiveSignal +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.stopped +import com.atlarge.odcsim.unhandled + +/** + * A simulation model for large-scale simulation of datacenter infrastructure, built with the *odcsim* API. + * + * @property environment The environment in which brokers operate. + * @property brokers The brokers acting on the cloud platforms. + */ +data class Model(val environment: Environment, val brokers: List) { + /** + * Build the runtime behavior of the universe. + */ + operator fun invoke(): Behavior = setup { ctx -> + // Setup the environment + val platforms = environment.platforms.map { platform -> + ctx.spawn(platform(), name = platform.name) + } + + // Launch all brokers + val remaining = mutableSetOf>() + for (broker in brokers) { + val ref = ctx.spawnAnonymous(broker(platforms)) + ctx.watch(ref) + remaining += ref + } + + receiveSignal { _, signal -> + when (signal) { + is Terminated -> { + remaining -= signal.ref + if (remaining.isEmpty()) stopped() else same() + } + else -> + unhandled() + } + } + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Platform.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Platform.kt new file mode 100644 index 00000000..129ee3a9 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Platform.kt @@ -0,0 +1,103 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.coroutines.actorContext +import com.atlarge.odcsim.coroutines.dsl.ask +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import java.util.UUID + +/** + * A representation of a cloud platform such as Amazon Web Services (AWS), Microsoft Azure or Google Cloud. + * + * @property uid The unique identifier of this datacenter. + * @property name the name of the platform. + * @property zones The availability zones available on this platform. + */ +data class Platform(override val uid: UUID, override val name: String, val zones: List) : Identity { + /** + * Build the runtime [Behavior] of this cloud platform. + */ + operator fun invoke(): Behavior = setup { ctx -> + ctx.log.info("Starting cloud platform {} [{}] with {} zones", name, uid, zones.size) + + // Launch all zones of the cloud platform + val zoneInstances = zones.associateWith { zone -> + ctx.spawn(zone(), name = zone.name) + } + + receiveMessage { msg -> + when (msg) { + is PlatformMessage.ListZones -> { + ctx.send(msg.replyTo, PlatformResponse.Zones(ctx.self, zoneInstances.mapKeys { it.key.name })) + same() + } + } + } + } +} + +/** + * A reference to the actor managing the zone. + */ +typealias PlatformRef = ActorRef + +/** + * A message protocol for communicating with a cloud platform. + */ +sealed class PlatformMessage { + /** + * Request the available zones on this platform. + * + * @property replyTo The actor address to send the response to. + */ + data class ListZones(val replyTo: ActorRef) : PlatformMessage() +} + +/** + * A message protocol used by platform actors to respond to [PlatformMessage]s. + */ +sealed class PlatformResponse { + /** + * The zones available on this cloud platform. + * + * @property platform The reference to the cloud platform these are the zones of. + * @property zones The zones in this cloud platform. + */ + data class Zones(val platform: PlatformRef, val zones: Map) : PlatformResponse() +} + +/** + * Retrieve the available zones of a platform. + */ +suspend fun PlatformRef.zones(): Map { + val ctx = actorContext() + val zones: PlatformResponse.Zones = ctx.ask(this) { PlatformMessage.ListZones(it) } + return zones.zones +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/User.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/User.kt new file mode 100644 index 00000000..502f7a74 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/User.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +/** + * A user of the cloud network. + */ +interface User : Identity { + /** + * The name of the user. + */ + override val name: String +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Zone.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Zone.kt new file mode 100644 index 00000000..b1d00dac --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Zone.kt @@ -0,0 +1,212 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.Terminated +import com.atlarge.odcsim.coroutines.actorContext +import com.atlarge.odcsim.coroutines.dsl.ask +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.unhandled +import com.atlarge.opendc.model.services.Service +import com.atlarge.opendc.model.services.ServiceProvider +import java.util.ArrayDeque +import java.util.UUID + +/** + * An isolated location within a datacenter region from which public cloud services operate, roughly equivalent to a + * single datacenter. Zones contain one or more clusters and secondary storage. + * + * This class models *only* the static information of a zone, with dynamic information being contained within the zone's + * actor. During runtime, it's actor acts as a registry for all the cloud services provided by the zone. + * + * @property uid The unique identifier of this availability zone + * @property name The name of the zone within its platform. + * @property services The initial set of services provided by the zone. + * @property clusters The clusters of machines in this zone. + */ +data class Zone( + override val uid: UUID, + override val name: String, + val services: Set, + val clusters: List +) : Identity { + /** + * Build the runtime [Behavior] of this datacenter. + */ + operator fun invoke(): Behavior = setup { ctx -> + ctx.log.info("Starting zone {} [{}]", name, uid) + + // Launch all services of the zone + val instances: MutableMap, ActorRef<*>> = mutableMapOf() + validateDependencies(services) + + for (provider in services) { + val ref = ctx.spawn(provider(this, ctx.self), name = "${provider.name}-${provider.uid}") + ctx.watch(ref) + provider.provides.forEach { instances[it] = ref } + } + + object : ReceivingBehavior() { + override fun receive(ctx: ActorContext, msg: ZoneMessage): Behavior { + return when (msg) { + is ZoneMessage.Find -> { + ctx.send(msg.replyTo, ZoneResponse.Listing(ctx.self, msg.key, instances[msg.key])) + same() + } + } + } + + override fun receiveSignal(ctx: ActorContext, signal: Signal): Behavior { + return when (signal) { + is Terminated -> { + instances.values.remove(signal.ref) + same() + } + else -> + unhandled() + } + } + } + } + + /** + * Validate the service for unsatisfiable dependencies. + */ + private fun validateDependencies(providers: Set) { + val providersByKey = HashMap, ServiceProvider>() + for (provider in providers) { + if (provider.provides.isEmpty()) { + throw IllegalArgumentException(("Service provider $provider does not provide any service.")) + } + for (key in provider.provides) { + if (key in providersByKey) { + throw IllegalArgumentException("Multiple providers for service $key") + } + providersByKey[key] = provider + } + } + + val visited = HashSet() + val queue = ArrayDeque(providers) + while (queue.isNotEmpty()) { + val service = queue.poll() + visited.add(service) + + for (dependencyKey in service.dependencies) { + val dependency = providersByKey[dependencyKey] + ?: throw IllegalArgumentException("Dependency $dependencyKey not satisfied for service $service") + if (dependency !in visited) { + queue.add(dependency) + } + } + } + } + + override fun equals(other: Any?): Boolean = other is Zone && uid == other.uid + override fun hashCode(): Int = uid.hashCode() +} + +/** + * A reference to the actor managing the zone. + */ +typealias ZoneRef = ActorRef + +/** + * A message protocol for communicating with the service registry + */ +sealed class ZoneMessage { + /** + * Lookup the specified service in this availability zone. + * + * @property key The key of the service to lookup. + * @property replyTo The address to reply to. + */ + data class Find( + val key: Service<*>, + val replyTo: ActorRef + ) : ZoneMessage() +} + +/** + * A message protocol used by service registry actors to respond to [ZoneMessage]s. + */ +sealed class ZoneResponse { + /** + * The response sent when looking up services in a zone. + * + * @property zone The zone from which the response originates. + * @property key The key of the service that was looked up. + * @property ref The reference to the service or `null` if it is not present in the zone. + */ + data class Listing( + val zone: ZoneRef, + val key: Service<*>, + private val ref: ActorRef<*>? + ) : ZoneResponse() { + /** + * A flag to indicate whether the service is present. + */ + val isPresent: Boolean + get() = ref != null + + /** + * Determine whether this listing is for the specified key. + * + * @param key The key to check for. + * @return `true` if the listing is for this key, `false` otherwise. + */ + fun isForKey(key: Service<*>): Boolean = key == this.key + + /** + * Extract the result from the service lookup. + * + * @param key The key of the lookup. + * @return The reference to the service or `null` if it is not present in the zone. + */ + operator fun invoke(key: Service): ActorRef? { + require(this.key == key) { "Invalid key" } + @Suppress("UNCHECKED_CAST") + return ref as? ActorRef + } + } +} + +/** + * Find the reference to the specified [ServiceProvider]. + * + * @param key The key of the service to find. + * @throws IllegalArgumentException if the service is not found. + */ +suspend fun ZoneRef.find(key: Service): ActorRef { + val ctx = actorContext() + val listing: ZoneResponse.Listing = ctx.ask(this) { ZoneMessage.Find(key, it) } + return listing(key) ?: throw IllegalArgumentException("Unknown key $key") +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/Machine.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/Machine.kt new file mode 100644 index 00000000..08d94e14 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/Machine.kt @@ -0,0 +1,105 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.opendc.model.Identity +import com.atlarge.opendc.model.resources.compute.supervision.MachineSupervisionEvent +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.Pid + +/** + * A generic representation of a compute node (either physical or virtual) that is able to run [Application]s. + */ +interface Machine : Identity { + /** + * The details of the machine in key/value pairs. + */ + val details: Map + + /** + * Build the runtime [Behavior] of this compute resource, accepting messages of [MachineMessage]. + * + * @param supervisor The supervisor of the machine. + */ + operator fun invoke(supervisor: ActorRef): Behavior +} + +/** + * A reference to a machine instance that accepts messages of type [MachineMessage]. + */ +typealias MachineRef = ActorRef + +/** + * A message protocol for communicating with machine instances. + */ +sealed class MachineMessage { + /** + * Launch the specified [Application] on the machine instance. + * + * @property application The application to submit. + * @property key The key to identify this submission. + * @property broker The broker of the process to spawn. + */ + data class Submit( + val application: Application, + val key: Any, + val broker: ActorRef + ) : MachineMessage() +} + +/** + * A message protocol used by machine instances to respond to [MachineMessage]s. + */ +sealed class MachineEvent { + /** + * Indicate that an [Application] was spawned on a machine instance. + * + * @property instance The machine instance to which the application was submitted. + * @property application The application that has been submitted. + * @property key The key used to identify the submission. + * @property pid The spawned application instance. + */ + data class Submitted( + val instance: MachineRef, + val application: Application, + val key: Any, + val pid: Pid + ) : MachineEvent() + + /** + * Indicate that an [Application] has terminated on the specified machine. + * + * @property instance The machine instance to which the application was submitted. + * @property pid The reference to the application instance that has terminated. + * @property status The exit code of the task, where zero means successful. + */ + data class Terminated( + val instance: MachineRef, + val pid: Pid, + val status: Int = 0 + ) : MachineEvent() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/MachineStatus.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/MachineStatus.kt new file mode 100644 index 00000000..f4fe9494 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/MachineStatus.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute + +/** + * The status of a machine. + */ +enum class MachineStatus { + HALT, + RUNNING +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingElement.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingElement.kt new file mode 100644 index 00000000..9e5b1b71 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingElement.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute + +/** + * A logical core in a CPU. + * + * @property id The identifier of the core within the CPU. + * @property unit The [ProcessingUnit] the core is part of. + */ +data class ProcessingElement(val id: Int, val unit: ProcessingUnit) diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingUnit.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingUnit.kt new file mode 100644 index 00000000..025087a4 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingUnit.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute + +/** + * A processing unit of a compute resource, either virtual or physical. + * + * @property vendor The vendor string of the cpu. + * @property family The cpu family number. + * @property model The model number of the cpu. + * @property modelName The name of the cpu model. + * @property clockRate The clock speed of the cpu in MHz. + * @property cores The number of logical cores in the cpu. + */ +data class ProcessingUnit( + val vendor: String, + val family: Int, + val model: Int, + val modelName: String, + val clockRate: Double, + val cores: Int +) diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/host/Host.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/host/Host.kt new file mode 100644 index 00000000..ca7ea204 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/host/Host.kt @@ -0,0 +1,86 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.host + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.join +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.unhandled +import com.atlarge.odcsim.withTimers +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineMessage +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import com.atlarge.opendc.model.resources.compute.scheduling.MachineScheduler +import com.atlarge.opendc.model.resources.compute.supervision.MachineSupervisionEvent +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.ProcessSupervisor +import java.util.UUID + +/** + * A physical compute node in a datacenter that is able to run [Application]s. + * + * @property uid The unique identifier of this machine. + * @property name The name of the machine. + * @property scheduler The process scheduler of this machine. + * @property cores The list of processing elements in the machine. + * @property details The details of this host. + */ +data class Host( + override val uid: UUID, + override val name: String, + val scheduler: MachineScheduler, + val cores: List, + override val details: Map = emptyMap() +) : Machine { + /** + * Build the [Behavior] for a physical machine. + */ + override fun invoke(supervisor: ActorRef): Behavior = setup { ctx -> + ctx.send(supervisor, MachineSupervisionEvent.Announce(this, ctx.self)) + ctx.send(supervisor, MachineSupervisionEvent.Up(ctx.self)) + + withTimers { timers -> + val sched = scheduler(this, ctx, timers) + sched.updateResources(cores) + receiveMessage { msg -> + when (msg) { + is MachineMessage.Submit -> { + sched.submit(msg.application, msg.key, msg.broker) + same() + } + else -> + unhandled() + } + }.join(ProcessSupervisor(sched).unsafeCast()).narrow() + } + } + + override fun equals(other: Any?): Boolean = other is Machine && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineScheduler.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineScheduler.kt new file mode 100644 index 00000000..de14f0fe --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineScheduler.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineMessage + +/** + * A factory interface for constructing a [MachineSchedulerLogic]. + */ +interface MachineScheduler { + /** + * Construct a [MachineSchedulerLogic] in the given [ActorContext]. + * + * @param machine The machine to create the scheduler for. + * @param ctx The actor context to construct a scheduler for. + * @param scheduler The timer scheduler to use. + */ + operator fun invoke( + machine: Machine, + ctx: ActorContext, + scheduler: TimerScheduler + ): MachineSchedulerLogic +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineSchedulerLogic.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineSchedulerLogic.kt new file mode 100644 index 00000000..879d6ed8 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineSchedulerLogic.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineEvent +import com.atlarge.opendc.model.resources.compute.MachineMessage +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.ProcessSupervisor + +/** + * A scheduler that distributes processes over processing elements in a machine. + * + * @property machine The machine to create the scheduler for. + * @property ctx The context in which the scheduler runs. + * @property scheduler The timer scheduler to use. + */ +abstract class MachineSchedulerLogic( + protected val machine: Machine, + protected val ctx: ActorContext, + protected val scheduler: TimerScheduler +) : ProcessSupervisor { + /** + * Update the available resources in the machine. + * + * @param cores The available processing cores for the scheduler. + */ + abstract fun updateResources(cores: List) + + /** + * Submit the specified application for scheduling. + * + * @param application The application to submit. + * @param key The key to identify the application instance. + * @param handler The broker of this application. + */ + abstract fun submit(application: Application, key: Any, handler: ActorRef) +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessObserver.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessObserver.kt new file mode 100644 index 00000000..11262d79 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessObserver.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.opendc.model.resources.compute.MachineEvent +import com.atlarge.opendc.model.resources.compute.MachineRef +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.Pid + +/** + * An interface for observing processes. + */ +interface ProcessObserver { + /** + * This method is invoked when the setup of an application completed successfully. + * + * @param pid The process id of the process that has been initialized. + */ + fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) + + /** + * This method is invoked when a process exits. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + fun onTermination(instance: MachineRef, pid: Pid, status: Int) + + companion object { + /** + * Create the [Behavior] for a [ProcessObserver]. + * + * @param observer The observer to create the behavior for. + */ + operator fun invoke(observer: ProcessObserver): Behavior = receiveMessage { msg -> + when (msg) { + is MachineEvent.Submitted -> observer.onSubmission(msg.instance, msg.application, msg.key, msg.pid) + is MachineEvent.Terminated -> observer.onTermination(msg.instance, msg.pid, msg.status) + } + same() + } + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessState.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessState.kt new file mode 100644 index 00000000..4c685fc4 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessState.kt @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +/** + * An enumeration of the distinct states of an application instance (process). + */ +enum class ProcessState { + /** + * Default state of a process, where the task is waiting to be assigned and installed on a machine. + */ + CREATED, + + /** + * State to indicate that the process is waiting to be ran. + */ + READY, + + /** + * State to indicate that the process is currently running. + */ + RUNNING, + + /** + * State to indicate that the process has been terminated, either successfully or due to failure. + */ + TERMINATED, +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessView.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessView.kt new file mode 100644 index 00000000..bc88e01f --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessView.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +import com.atlarge.odcsim.ActorRef +import com.atlarge.opendc.model.resources.compute.MachineEvent +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.Pid +import com.atlarge.opendc.model.workload.application.ProcessMessage + +/** + * A process represents a application instance running on a particular machine from the machine scheduler's point of + * view. + * + * @property application The application this is an instance of. + * @property broker The broker of the process, which is informed about its progress. + * @property pid The reference to the application instance. + * @property state The state of the process. + */ +data class ProcessView( + val application: Application, + val broker: ActorRef, + val pid: Pid, + var state: ProcessState = ProcessState.CREATED +) { + /** + * The slice of processing elements allocated for the process. Available as soon as the state + * becomes [ProcessState.RUNNING] + */ + lateinit var allocation: ProcessMessage.Allocation +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/SpaceSharedMachineScheduler.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/SpaceSharedMachineScheduler.kt new file mode 100644 index 00000000..457c7070 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/SpaceSharedMachineScheduler.kt @@ -0,0 +1,178 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.odcsim.unsafeCast +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineEvent +import com.atlarge.opendc.model.resources.compute.MachineMessage +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.Pid +import com.atlarge.opendc.model.workload.application.ProcessMessage +import com.atlarge.opendc.model.workload.application.ProcessSupervisor +import java.util.ArrayDeque +import java.util.UUID + +/** + * A machine scheduling policy where processes are space-shared on the machine. + * + * Space-sharing for machine scheduling means that all running processes will be allocated a separate set of the + * [ProcessingElement]s in a [Machine]. Applications are scheduled on the machine in first-in-first-out (FIFO) order, + * thus larger applications may block smaller tasks from proceeding, while space is available (no backfilling). + * + * @property machine The machine to create the scheduler for. + * @property ctx The context in which the scheduler runs. + * @property scheduler The timer scheduler to use. + */ +class SpaceSharedMachineScheduler( + machine: Machine, + ctx: ActorContext, + scheduler: TimerScheduler +) : MachineSchedulerLogic(machine, ctx, scheduler), ProcessSupervisor { + private var cores = 0 + private val available = ArrayDeque() + private val queue = ArrayDeque>() + private val running = LinkedHashSet>() + private val processes = HashMap, ProcessView>() + + override fun updateResources(cores: List) { + available.addAll(cores) + this.cores = cores.size + + // Add all running tasks in front of the queue + running.reversed().forEach { queue.addFirst(it) } + running.clear() + + reschedule() + } + + override fun submit(application: Application, key: Any, handler: ActorRef) { + // Create application instance on the machine + val pid = ctx.spawn(application(), name = application.name + ":" + application.uid + ":" + UUID.randomUUID().toString()) + processes[pid] = ProcessView(application, handler, pid) + + // Setup the task + ctx.send(pid, ProcessMessage.Setup(machine, ctx.self.unsafeCast())) + + // Inform the owner that the task has been submitted + ctx.send(handler, MachineEvent.Submitted(ctx.self, application, key, pid)) + } + + /** + * Reschedule the tasks on this machine. + */ + private fun reschedule() { + while (queue.isNotEmpty()) { + val pid = queue.peek() + val process = processes[pid]!! + + if (process.application.cores >= cores) { + // The task will never fit on the machine + // TODO Fail task + ctx.log.warn("Process {} will not fit in machine: dropping.", process) + queue.remove() + return + } else if (process.application.cores > available.size) { + // The task will not fit at the moment + // Try again if resources become available + ctx.log.debug("Application queued: not enough processing elements available [requested={}, available={}]", + process.application.cores, available.size) + return + } + queue.remove() + + // Compute the available resources + val resources = List(process.application.cores) { + val pe = available.poll() + Pair(pe, 1.0) + }.toMap() + process.state = ProcessState.RUNNING + process.allocation = ProcessMessage.Allocation(resources, Instant.POSITIVE_INFINITY) + running += pid + + ctx.send(pid, process.allocation) + } + } + + override fun onReady(pid: Pid) { + val process = processes[pid]!! + + // Schedule the task if it has been setup + queue.add(pid) + process.state = ProcessState.READY + + reschedule() + } + + override fun onConsume(pid: Pid, utilization: Map, until: Instant) { + val process = processes[pid]!! + val allocation = process.allocation + + if (until > allocation.until) { + // Tasks are not allowed to extend allocation provided by the machine + // TODO Fail the task + ctx.log.warn("Task {} must not extend allocation provided by the machine", pid) + } else if (until < allocation.until) { + // Shrink allocation + process.allocation = allocation.copy(until = until) + } + + // Reschedule the process after the allocation expires + scheduler.after(pid, process.allocation.until - ctx.time) { + // We just extend the allocation + process.allocation = process.allocation.copy(until = Instant.POSITIVE_INFINITY) + ctx.send(pid, process.allocation) + } + } + + override fun onExit(pid: Pid, status: Int) { + val process = processes.remove(pid)!! + running -= pid + scheduler.cancel(pid) + process.allocation.resources.keys.forEach { available.add(it) } + + ctx.log.debug("Application {} terminated with status {}", pid, status) + + // Inform the owner that the task has terminated + ctx.send(process.broker, MachineEvent.Terminated(ctx.self, pid, status)) + + reschedule() + } + + companion object : MachineScheduler { + override fun invoke( + machine: Machine, + ctx: ActorContext, + scheduler: TimerScheduler + ): MachineSchedulerLogic { + return SpaceSharedMachineScheduler(machine, ctx, scheduler) + } + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisionEvent.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisionEvent.kt new file mode 100644 index 00000000..2b3fae3d --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisionEvent.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.supervision + +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineRef + +/** + * A supervision protocol for [Machine] instances. + */ +sealed class MachineSupervisionEvent { + /** + * Initialization message to introduce to the supervisor a new machine by specifying its static information and + * address. + * + * @property machine The machine that is being announced. + * @property ref The address to talk to the host. + */ + data class Announce(val machine: Machine, val ref: MachineRef) : MachineSupervisionEvent() + + /** + * Indicate that the specified machine has booted up. + * + * @property ref The address to talk to the machine. + */ + data class Up(val ref: MachineRef) : MachineSupervisionEvent() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisor.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisor.kt new file mode 100644 index 00000000..c3607a22 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisor.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.supervision + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineRef + +/** + * An interface for supervising [Machine] instances. + */ +interface MachineSupervisor { + /** + * This method is invoked when a new machine is introduced to the supervisor by specifying its static information + * and address. + * + * @param machine The machine that is being announced. + * @param ref The address to talk to the host. + */ + fun onAnnounce(machine: Machine, ref: MachineRef) + + /** + * This method is invoked when a process exits. + * + * @param ref The address to talk to the machine. + */ + fun onUp(ref: MachineRef) + + companion object { + /** + * Create the [Behavior] for a [MachineSupervisor]. + * + * @param supervisor The supervisor to create the behavior for. + */ + operator fun invoke(supervisor: MachineSupervisor): Behavior = receiveMessage { msg -> + when (msg) { + is MachineSupervisionEvent.Announce -> supervisor.onAnnounce(msg.machine, msg.ref) + is MachineSupervisionEvent.Up -> supervisor.onUp(msg.ref) + } + same() + } + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/Service.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/Service.kt new file mode 100644 index 00000000..e8b25b88 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/Service.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services + +import com.atlarge.opendc.model.Identity +import java.util.UUID + +/** + * An interface for identifying service implementations of the same type (providing the same service). + * + * @param T The shape of the messages the service responds to. + */ +interface Service : Identity + +/** + * Helper class for constructing a [Service]. + * + * @property uid The unique identifier of the service. + * @property name The name of the service. + */ +abstract class AbstractService(override val uid: UUID, override val name: String) : Service { + override fun equals(other: Any?): Boolean = other is Service<*> && uid == other.uid + override fun hashCode(): Int = uid.hashCode() + override fun toString(): String = "Service[uid=$uid,name=$name]" +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceMap.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceMap.kt new file mode 100644 index 00000000..d91208bf --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceMap.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services + +import com.atlarge.odcsim.ActorRef + +/** + * A map containing services. + */ +interface ServiceMap { + /** + * Determine if this map contains the service with the specified [Service]. + * + * @param key The key of the service to check for. + * @return `true` if the service is in the map, `false` otherwise. + */ + operator fun contains(key: Service<*>): Boolean + + /** + * Obtain the service with the specified [Service]. + * + * @param key The key of the service to obtain. + * @return The references to the service. + * @throws IllegalArgumentException if the key does not exists in the map. + */ + operator fun get(key: Service): ActorRef +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceProvider.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceProvider.kt new file mode 100644 index 00000000..1bf5b22e --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceProvider.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services + +import com.atlarge.odcsim.Behavior +import com.atlarge.opendc.model.Identity +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.ZoneRef +import java.util.UUID + +/** + * An abstract representation of a cloud service implementation provided by a cloud platform. + */ +interface ServiceProvider : Identity { + /** + * The unique identifier of the service implementation. + */ + override val uid: UUID + + /** + * The name of the service implementation. + */ + override val name: String + + /** + * The set of services provided by this [ServiceProvider]. + */ + val provides: Set> + + /** + * The dependencies of the service implementation. + */ + val dependencies: Set> + + /** + * Build the runtime [Behavior] for this service. + * + * @param zone The zone model for which the service should be build. + * @param ref The runtime reference to the zone's actor for communication. + */ + operator fun invoke(zone: Zone, ref: ZoneRef): Behavior<*> +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/ProvisioningService.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/ProvisioningService.kt new file mode 100644 index 00000000..22b70f35 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/ProvisioningService.kt @@ -0,0 +1,77 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.provisioning + +import com.atlarge.odcsim.ActorRef +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.services.AbstractService +import com.atlarge.opendc.model.services.Service +import com.atlarge.opendc.model.services.ServiceProvider +import com.atlarge.opendc.model.services.resources.HostView +import java.util.UUID + +/** + * A cloud platform service that provisions the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +abstract class ProvisioningService : ServiceProvider { + override val provides: Set> = setOf(ProvisioningService) + + /** + * The service key of the provisioner service. + */ + companion object : AbstractService(UUID.randomUUID(), "provisioner") +} + +/** + * A message protocol for communicating to the resource provisioner. + */ +sealed class ProvisioningMessage { + /** + * Request the specified number of resources from the provisioner. + * + * @property numHosts The number of hosts to request from the provisioner. + * @property replyTo The actor to reply to. + */ + data class Request(val numHosts: Int, val replyTo: ActorRef) : ProvisioningMessage() + + /** + * Release the specified resource [ProvisioningResponse.Lease]. + * + * @property lease The lease to release. + */ + data class Release(val lease: ProvisioningResponse.Lease) : ProvisioningMessage() +} + +/** + * A message protocol used by the resource provisioner to respond to [ProvisioningMessage]s. + */ +sealed class ProvisioningResponse { + /** + * A lease for the specified hosts. + */ + data class Lease(val hosts: List) : ProvisioningResponse() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/SimpleProvisioningService.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/SimpleProvisioningService.kt new file mode 100644 index 00000000..1ddc1b69 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/SimpleProvisioningService.kt @@ -0,0 +1,125 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.provisioning + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.StashBuffer +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.unhandled +import com.atlarge.odcsim.unsafeCast +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.ZoneMessage +import com.atlarge.opendc.model.ZoneRef +import com.atlarge.opendc.model.ZoneResponse +import com.atlarge.opendc.model.resources.compute.MachineRef +import com.atlarge.opendc.model.services.Service +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.resources.ResourceManagementMessage +import com.atlarge.opendc.model.services.resources.ResourceManagementResponse +import com.atlarge.opendc.model.services.resources.ResourceManagementService +import com.atlarge.opendc.model.services.resources.ResourceManagerRef +import java.util.ArrayDeque +import java.util.UUID + +/** + * A cloud platform service that provisions the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +object SimpleProvisioningService : ProvisioningService() { + override val uid: UUID = UUID.randomUUID() + override val name: String = "simple-provisioner" + override val dependencies: Set> = setOf(ResourceManagementService) + + /** + * Build the runtime [Behavior] for the resource provisioner, responding to messages of shape [ProvisioningMessage]. + */ + override fun invoke(zone: Zone, ref: ZoneRef): Behavior = setup { ctx -> + val buffer = StashBuffer(capacity = 30) + ctx.send(ref, ZoneMessage.Find(ResourceManagementService, ctx.self.unsafeCast())) + + receiveMessage { msg -> + when (msg) { + is ZoneResponse.Listing -> { + val service = msg(ResourceManagementService) ?: throw IllegalStateException("Resource management service not available") + buffer.unstashAll(ctx.unsafeCast(), active(zone, service).unsafeCast()) + } + else -> { + buffer.stash(msg) + same() + } + } + }.narrow() + } + + private fun active(zone: Zone, manager: ResourceManagerRef) = setup { ctx -> + val hosts = mutableMapOf() + val available = ArrayDeque() + val leases = mutableSetOf() + + // Subscribe to all machines in the zone + for (cluster in zone.clusters) { + for (host in cluster.hosts) { + ctx.send(manager, ResourceManagementMessage.Lookup(host, ctx.self.unsafeCast())) + } + } + + receiveMessage { msg -> + when (msg) { + is ProvisioningMessage.Request -> { + ctx.log.info("Provisioning {} hosts", msg.numHosts) + val leaseHosts = mutableListOf() + while (available.isNotEmpty() && leaseHosts.size < msg.numHosts) { + leaseHosts += available.poll() + } + val lease = ProvisioningResponse.Lease(leaseHosts) + leases += lease + ctx.send(msg.replyTo, lease) + same() + } + is ProvisioningMessage.Release -> { + val lease = msg.lease + if (lease in leases) { + return@receiveMessage same() + } + available.addAll(lease.hosts) + leases -= lease + same() + } + is ResourceManagementResponse.Listing -> { + if (msg.instance != null) { + hosts[msg.instance.ref] = msg.instance + available.add(msg.instance) + } + same() + } + else -> + unhandled() + } + }.narrow() + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/HostView.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/HostView.kt new file mode 100644 index 00000000..943461cd --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/HostView.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.resources + +import com.atlarge.opendc.model.resources.compute.MachineRef +import com.atlarge.opendc.model.resources.compute.MachineStatus +import com.atlarge.opendc.model.resources.compute.host.Host + +/** + * The dynamic information of a [Host] instance that is being tracked by the [ResourceManagementService]. This means + * that information may not be up-to-date. + * + * @property host The static information of the host. + * @property ref The reference to the host's actor. + * @property status The status of the machine. + */ +data class HostView(val host: Host, val ref: MachineRef, val status: MachineStatus = MachineStatus.HALT) { + override fun equals(other: Any?): Boolean = other is HostView && host.uid == other.host.uid + override fun hashCode(): Int = host.uid.hashCode() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/ResourceManagementService.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/ResourceManagementService.kt new file mode 100644 index 00000000..5e38c6da --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/ResourceManagementService.kt @@ -0,0 +1,121 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.resources + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.unhandled +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.ZoneRef +import com.atlarge.opendc.model.resources.compute.MachineRef +import com.atlarge.opendc.model.resources.compute.MachineStatus +import com.atlarge.opendc.model.resources.compute.host.Host +import com.atlarge.opendc.model.resources.compute.supervision.MachineSupervisionEvent +import com.atlarge.opendc.model.services.Service +import com.atlarge.opendc.model.services.ServiceProvider +import java.util.UUID + +/** + * A cloud platform service that manages the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +object ResourceManagementService : ServiceProvider, Service { + override val uid: UUID = UUID.randomUUID() + override val name: String = "resource-manager" + override val provides: Set> = setOf(ResourceManagementService) + override val dependencies: Set> = emptySet() + + /** + * Build the runtime behavior of the [ResourceManagementService]. + */ + override fun invoke(zone: Zone, ref: ZoneRef): Behavior = setup { ctx -> + // Launch the clusters of the zone + for (cluster in zone.clusters) { + ctx.spawn(cluster(ctx.self), name = "${cluster.name}-${cluster.uid}") + } + + val hosts = mutableMapOf() + receiveMessage { msg -> + when (msg) { + is MachineSupervisionEvent.Announce -> { + val host = msg.machine as? Host + if (host != null) { + hosts[msg.ref] = HostView(host, msg.ref) + } + same() + } + is MachineSupervisionEvent.Up -> { + hosts.computeIfPresent(msg.ref) { _, value -> + value.copy(status = MachineStatus.RUNNING) + } + same() + } + is ResourceManagementMessage.Lookup -> { + ctx.send(msg.replyTo, ResourceManagementResponse.Listing(hosts.values.find { it.host == msg.host })) + same() + } + else -> + unhandled() + } + }.narrow() + } +} + +/** + * A reference to the resource manager of a zone. + */ +typealias ResourceManagerRef = ActorRef + +/** + * A message protocol for communicating to the resource manager. + */ +sealed class ResourceManagementMessage { + /** + * Lookup the specified [Host]. + * + * @property host The host to lookup. + * @property replyTo The address to sent the response to. + */ + data class Lookup( + val host: Host, + val replyTo: ActorRef + ) : ResourceManagementMessage() +} + +/** + * A message protocol used by the resource manager to respond to [ResourceManagementMessage]s. + */ +sealed class ResourceManagementResponse { + /** + * A response to a [ResourceManagementMessage.Lookup] request. + * + * @property instance The instance that was found or `null` if it does not exist. + */ + data class Listing(val instance: HostView?) : ResourceManagementResponse() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/Workload.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/Workload.kt new file mode 100644 index 00000000..c1215715 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/Workload.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload + +import com.atlarge.opendc.model.Identity +import com.atlarge.opendc.model.User + +/** + * A high-level abstraction that represents the actual work that a set of compute resources perform, such + * as running an application on a machine or a whole workflow running multiple tasks on numerous machines. + */ +interface Workload : Identity { + /** + * The owner of this workload. + */ + val owner: User +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Application.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Application.kt new file mode 100644 index 00000000..00ab98b6 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Application.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.application + +import com.atlarge.odcsim.Behavior +import com.atlarge.opendc.model.workload.Workload + +/** + * A generic representation of a workload that can directly be executed by physical or virtual compute resources, + * such as a web server application. + */ +interface Application : Workload { + /** + * The number of processing elements required by the task. + */ + val cores: Int + + /** + * Build the runtime [Behavior] of an application, accepting messages of [ProcessMessage]. + * + * This is a model for the runtime behavior of an application instance (process) that describes how an application + * instance consumes the allocated resources on a machine. + */ + operator fun invoke(): Behavior +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplication.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplication.kt new file mode 100644 index 00000000..60a896d4 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplication.kt @@ -0,0 +1,164 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.application + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.DeferredBehavior +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.receive +import com.atlarge.odcsim.stopped +import com.atlarge.odcsim.unhandled +import com.atlarge.opendc.model.User +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import java.lang.Double.min +import java.util.UUID +import kotlin.math.ceil + +/** + * An [Application] implementation that models applications performing a static number of floating point operations + * ([flops]) on a compute resource. + * + * @property uid A unique identifier for this application. + * @property name The name of the application. + * @property owner The owner of this application. + * @property cores The number of cores needed for this application. + * @property flops The number of floating point operations to perform for this task. + */ +class FlopsApplication( + override val uid: UUID, + override val name: String, + override val owner: User, + override val cores: Int, + val flops: Long +) : Application { + + init { + require(flops >= 0) { "Negative number of flops" } + } + + /** + * Build the runtime [Behavior] based on a number of floating point operations to execute. + */ + override fun invoke(): Behavior = object : DeferredBehavior() { + /** + * The remaining number of floating point operations to execute. + */ + var remaining = flops + + /** + * The machine to which the task is assigned. + */ + lateinit var machine: Machine + + /** + * The reference to the machine instance. + */ + lateinit var ref: ActorRef + + /** + * The start of the last allocation + */ + var start: Instant = 0.0 + + /** + * The given resource allocation. + */ + lateinit var allocation: Map + + override fun invoke(ctx: ActorContext) = created() + + /** + * Handle the initial, created state of a task instance. + */ + private fun created(): Behavior = receive { ctx, msg -> + when (msg) { + is ProcessMessage.Setup -> { + machine = msg.machine + ref = msg.ref + /* TODO implement setup time */ + ctx.send(ref, ProcessEvent.Ready(ctx.self)) + ready().narrow() + } + else -> unhandled() + } + } + + /** + * Handle the ready state of a task instance. + */ + private fun ready(): Behavior = receive { ctx, msg -> + when (msg) { + is ProcessMessage.Allocation -> { + processAllocation(ctx, msg.resources, msg.until) + running() + } + else -> unhandled() + } + } + + /** + * Handle the running state of a task instance. + */ + private fun running(): Behavior = receive { ctx, msg -> + when (msg) { + is ProcessMessage.Allocation -> { + /* Compute the consumption of flops */ + val consumed = allocation.asSequence() + .map { (key, value) -> key.unit.clockRate * value * (ctx.time - start) } + .sum() + // Ceil to prevent consumed flops being rounded to 0 + remaining -= ceil(consumed).toLong() + + /* Test whether all flops have been consumed and the task is finished */ + if (remaining <= 0) { + ctx.send(ref, ProcessEvent.Exit(ctx.self, 0)) + return@receive stopped() + } + + processAllocation(ctx, msg.resources, msg.until) + running().narrow() + } + else -> unhandled() + } + } + + private fun processAllocation(ctx: ActorContext, resources: Map, until: Instant) { + start = ctx.time + allocation = resources + .asSequence() + .take(cores) + .associateBy({ it.key }, { it.value }) + + val speed = allocation.asSequence() + .map { (key, value) -> key.unit.clockRate * value } + .average() + val finishedAt = ctx.time + remaining / speed + ctx.send(ref, ProcessEvent.Consume(ctx.self, allocation, min(finishedAt, until))) + } + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Process.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Process.kt new file mode 100644 index 00000000..a78b8572 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Process.kt @@ -0,0 +1,91 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.application + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Instant +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.ProcessingElement + +/** + * The process id (pid) is a reference to the application instance (process) that accepts messages of + * type [ProcessMessage]. + */ +typealias Pid = ActorRef + +/** + * A message protocol for actors to communicate with task instances (called processes). + */ +sealed class ProcessMessage { + /** + * Indicate that the task should be installed to the specified machine. + * + * @property machine The machine to install the task. + * @property ref The reference to the machine instance. + */ + data class Setup(val machine: Machine, val ref: ActorRef) : ProcessMessage() + + /** + * Indicate an allocation of compute resources on a machine for a certain duration. + * The task may assume that the reservation occurs after installation on the same machine. + * + * @property resources The cpu cores (and the utilization percentages) allocated for the task. + * @property until The point in time till which the reservation is valid. + */ + data class Allocation(val resources: Map, val until: Instant) : ProcessMessage() +} + +/** + * The message protocol used by application instances respond to [ProcessMessage]s. + */ +sealed class ProcessEvent { + /** + * Indicate that the process is ready to start processing. + * + * @property pid A reference to the application instance. + */ + data class Ready(val pid: Pid) : ProcessEvent() + + /** + * Indicate the estimated resource utilization of the task until a specified point in time. + * + * @property pid A reference to the application instance of the represented utilization. + * @property utilization The utilization of the cpu cores as a percentage. + * @property until The point in time until which the utilization is valid. + */ + data class Consume( + val pid: Pid, + val utilization: Map, + val until: Instant + ) : ProcessEvent() + + /** + * Indicate that a process has been terminated. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + data class Exit(val pid: Pid, val status: Int) : ProcessEvent() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/ProcessSupervisor.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/ProcessSupervisor.kt new file mode 100644 index 00000000..2952c82e --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/ProcessSupervisor.kt @@ -0,0 +1,79 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.application + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.opendc.model.resources.compute.ProcessingElement + +/** + * An interface for supervising processes. + */ +interface ProcessSupervisor { + /** + * This method is invoked when the setup of an application completed successfully. + * + * @param pid The process id of the process that has been initialized. + */ + fun onReady(pid: Pid) {} + + /** + * This method is invoked when a process informs the machine that it is running with the + * estimated resource utilization until a specified point in time. + * + * @param pid The process id of the process that is running. + * @param utilization The utilization of the cpu cores as a percentage. + * @param until The point in time until which the utilization is valid. + */ + fun onConsume(pid: Pid, utilization: Map, until: Instant) {} + + /** + * This method is invoked when a process exits. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + fun onExit(pid: Pid, status: Int) {} + + companion object { + /** + * Create the [Behavior] for a [ProcessSupervisor]. + * + * @param supervisor The supervisor to create the behavior for. + */ + operator fun invoke(supervisor: ProcessSupervisor): Behavior { + return receiveMessage { msg -> + when (msg) { + is ProcessEvent.Ready -> supervisor.onReady(msg.pid) + is ProcessEvent.Consume -> supervisor.onConsume(msg.pid, msg.utilization, msg.until) + is ProcessEvent.Exit -> supervisor.onExit(msg.pid, msg.status) + } + same() + } + } + } +} diff --git a/opendc-core/src/test/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplicationTest.kt b/opendc-core/src/test/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplicationTest.kt new file mode 100644 index 00000000..2cde1b6f --- /dev/null +++ b/opendc-core/src/test/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplicationTest.kt @@ -0,0 +1,128 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.application + +import com.atlarge.odcsim.testkit.BehaviorTestKit +import com.atlarge.opendc.model.User +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import com.atlarge.opendc.model.resources.compute.ProcessingUnit +import com.nhaarman.mockitokotlin2.mock +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import java.util.UUID + +/** + * A test suite for the [FlopsApplication]. + */ +@DisplayName("FlopsApplication") +internal class FlopsApplicationTest { + private val flops = 10000L + private val cores = 2 + private val machine: Machine = mock() + private val user: User = mock() + private val cpu: ProcessingUnit = ProcessingUnit("Intel", 6, 8600, "Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz", 2900.0, 1) + + private lateinit var application: FlopsApplication + + @BeforeEach + fun setUp() { + application = FlopsApplication(UUID.randomUUID(), "java", user, cores, flops) + } + + @Test + fun `should become ready after triggering installation`() { + val test = BehaviorTestKit(application()) + val inbox = test.createInbox() + test.run(ProcessMessage.Setup(machine, inbox.ref)) + inbox.expectMessage(ProcessEvent.Ready(test.ref)) + } + + @Test + fun `should not respond to setup request after being created`() { + val test = BehaviorTestKit(application()) + val inbox = test.createInbox() + + // Setup Machine + test.run(ProcessMessage.Setup(machine, inbox.ref)) + inbox.clear() + + // Try again + assertFalse(test.run(ProcessMessage.Setup(machine, inbox.ref))) + } + + @Test + fun `should respond to allocation with consumption`() { + val test = BehaviorTestKit(application()) + val inbox = test.createInbox() + + val allocation = ProcessMessage.Allocation(mapOf(ProcessingElement(0, cpu) to 0.5), until = 10.0) + + // Setup Machine + test.run(ProcessMessage.Setup(machine, inbox.ref)) + inbox.clear() + + // Test allocation + test.run(allocation) + val msg = inbox.receiveMessage() + assertTrue(msg is ProcessEvent.Consume) + } + + @Test + fun `should inform the machine that it finished processing`() { + val test = BehaviorTestKit(application()) + val inbox = test.createInbox() + val allocation = ProcessMessage.Allocation(mapOf(ProcessingElement(0, cpu) to 0.5), until = 10.0) + + // Setup + test.run(ProcessMessage.Setup(machine, inbox.ref)) + test.run(allocation) + test.runTo(10.0) + inbox.clear() + + test.run(allocation) + inbox.expectMessage(ProcessEvent.Exit(test.ref, 0)) + } + + @Test + fun `should be able to update its utilization`() { + val test = BehaviorTestKit(application()) + val inbox = test.createInbox() + val allocation1 = ProcessMessage.Allocation(mapOf(ProcessingElement(0, cpu) to 0.5), until = 10.0) + val allocation2 = ProcessMessage.Allocation(mapOf(ProcessingElement(0, cpu) to 0.25), until = 10.0) + + // Setup + test.run(ProcessMessage.Setup(machine, inbox.ref)) + test.run(allocation1) + test.runTo(5.0) + inbox.clear() + + test.run(allocation2) + assertTrue(inbox.receiveMessage() is ProcessEvent.Consume) + } +} diff --git a/opendc-experiments-tpds/build.gradle.kts b/opendc-experiments-tpds/build.gradle.kts new file mode 100644 index 00000000..3ec580af --- /dev/null +++ b/opendc-experiments-tpds/build.gradle.kts @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` + application +} + +/* Project configuration */ +repositories { + jcenter() +} + +application { + mainClassName = "com.atlarge.opendc.experiments.tpds.TestExperiment" +} + +dependencies { + api(project(":opendc-core")) + implementation(project(":opendc-format-gwf")) + implementation(project(":opendc-format-sc18")) + implementation(project(":opendc-workflows")) + implementation(kotlin("stdlib")) + + runtimeOnly(project(":odcsim-engine-omega")) + runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.11.2") +} diff --git a/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt b/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt new file mode 100644 index 00000000..ad302889 --- /dev/null +++ b/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt @@ -0,0 +1,148 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.tpds + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorSystemFactory +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.odcsim.coroutines.suspending +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.stopped +import com.atlarge.odcsim.unhandled +import com.atlarge.odcsim.withTimers +import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader +import com.atlarge.opendc.format.trace.gwf.GwfTraceReader +import com.atlarge.opendc.model.Broker +import com.atlarge.opendc.model.Model +import com.atlarge.opendc.model.PlatformRef +import com.atlarge.opendc.model.find +import com.atlarge.opendc.model.services.provisioning.SimpleProvisioningService +import com.atlarge.opendc.model.services.resources.ResourceManagementService +import com.atlarge.opendc.model.services.workflows.StageWorkflowScheduler +import com.atlarge.opendc.model.services.workflows.WorkflowEvent +import com.atlarge.opendc.model.services.workflows.WorkflowMessage +import com.atlarge.opendc.model.services.workflows.WorkflowSchedulerMode +import com.atlarge.opendc.model.services.workflows.WorkflowService +import com.atlarge.opendc.model.services.workflows.stages.job.FifoJobSortingPolicy +import com.atlarge.opendc.model.services.workflows.stages.job.NullJobAdmissionPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.FirstFitResourceSelectionPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.FunctionalResourceDynamicFilterPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.FifoTaskSortingPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.FunctionalTaskEligibilityPolicy +import com.atlarge.opendc.model.workload.workflow.Job +import com.atlarge.opendc.model.zones +import java.io.File +import java.util.ServiceLoader +import kotlin.math.max + +/** + * Main entry point of the experiment. + */ +fun main(args: Array) { + if (args.isEmpty()) { + println("error: Please provide path to GWF trace") + return + } + + + val scheduler = StageWorkflowScheduler( + mode = WorkflowSchedulerMode.Batch(100.0), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobSortingPolicy = FifoJobSortingPolicy(), + taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), + taskSortingPolicy = FifoTaskSortingPolicy(), + resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), + resourceSelectionPolicy = FirstFitResourceSelectionPolicy() + ) + + val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) + .use { it.read() } + .let { env -> + env.copy(platforms = env.platforms.map { platform -> + platform.copy(zones = platform.zones.map { zone -> + val services = zone.services + setOf(ResourceManagementService, SimpleProvisioningService, WorkflowService(scheduler)) + zone.copy(services = services) + }) + }) + } + + val broker = object : Broker { + override fun invoke(platforms: List): Behavior<*> = suspending { ctx -> + val zones = platforms.first().zones() + val service = zones.values.first().find(WorkflowService) + + val activeJobs = mutableSetOf() + val reader = GwfTraceReader(File(args[0])) + + fun submitNext(ctx: ActorContext, timers: TimerScheduler) { + if (!reader.hasNext()) { + return + } + + val (time, job) = reader.next() + timers.after(job, max(.0, time - ctx.time)) { + ctx.send(service, WorkflowMessage.Submit(job, ctx.self)) + submitNext(ctx, timers) + } + } + + var total = 0 + var finished = 0 + + withTimers { timers -> + submitNext(ctx, timers) + receiveMessage { msg -> + when (msg) { + is WorkflowEvent.JobSubmitted -> { + ctx.log.info("Job {} submitted", msg.job.uid) + total += 1 + same() + } + is WorkflowEvent.JobStarted -> { + activeJobs += msg.job + same() + } + is WorkflowEvent.JobFinished -> { + activeJobs -= msg.job + finished += 1 + ctx.log.info("Jobs {}/{} finished ({} tasks)", finished, total, msg.job.tasks.size) + if (activeJobs.isEmpty()) stopped() else same() + } + else -> + unhandled() + } + } + } + } + } + + val model = Model(environment, listOf(broker)) + val factory = ServiceLoader.load(ActorSystemFactory::class.java).first() + val system = factory(model(), name = "sim") + system.run() + system.terminate() +} diff --git a/opendc-experiments-tpds/src/main/resources/env/setup-test.json b/opendc-experiments-tpds/src/main/resources/env/setup-test.json new file mode 100644 index 00000000..0965b250 --- /dev/null +++ b/opendc-experiments-tpds/src/main/resources/env/setup-test.json @@ -0,0 +1,36 @@ +{ + "name": "Experimental Setup 2", + "rooms": [ + { + "type": "SERVER", + "objects": [ + { + "type": "RACK", + "machines": [ + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]} + ] + }, + { + "type": "RACK", + "machines": [ + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]} + ] + } + ] + } + ] +} diff --git a/opendc-experiments-tpds/src/main/resources/log4j2.xml b/opendc-experiments-tpds/src/main/resources/log4j2.xml new file mode 100644 index 00000000..67bf34ab --- /dev/null +++ b/opendc-experiments-tpds/src/main/resources/log4j2.xml @@ -0,0 +1,52 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/opendc-format-gwf/build.gradle.kts b/opendc-format-gwf/build.gradle.kts new file mode 100644 index 00000000..caf86c0c --- /dev/null +++ b/opendc-format-gwf/build.gradle.kts @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` +} + +/* Project configuration */ +repositories { + jcenter() +} + +val junitJupiterVersion: String by extra +val junitPlatformVersion: String by extra + +dependencies { + api(project(":opendc-core")) + api(project(":opendc-format")) + api(project(":opendc-workflows")) + implementation(kotlin("stdlib")) + + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion") + testImplementation("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") +} diff --git a/opendc-format-gwf/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format-gwf/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt new file mode 100644 index 00000000..df6a4b11 --- /dev/null +++ b/opendc-format-gwf/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt @@ -0,0 +1,168 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.gwf + +import com.atlarge.odcsim.Instant +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.model.User +import com.atlarge.opendc.model.workload.application.FlopsApplication +import com.atlarge.opendc.model.workload.workflow.Job +import com.atlarge.opendc.model.workload.workflow.Task +import java.io.BufferedReader +import java.io.File +import java.io.InputStream +import java.util.UUID +import kotlin.math.max +import kotlin.math.min + +/** + * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more + * information about the format. + * + * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore + * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a + * different format for better performance. + * + * @param reader The buffered reader to read the trace with. + */ +class GwfTraceReader(reader: BufferedReader) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Create a [GwfTraceReader] instance from the specified [File]. + * + * @param file The file to read from. + */ + constructor(file: File) : this(file.bufferedReader()) + + /** + * Create a [GwfTraceReader] instance from the specified [InputStream]. + * + * @param input The input stream to read from. + */ + constructor(input: InputStream) : this(input.bufferedReader()) + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf() + val tasks = mutableMapOf() + val taskDependencies = mutableMapOf>() + + var workflowIdCol = 0 + var taskIdCol = 0 + var submitTimeCol = 0 + var runtimeCol = 0 + var coreCol = 0 + var dependencyCol = 0 + + try { + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(",") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + workflowIdCol = header["WorkflowID"]!! + taskIdCol = header["JobID"]!! + submitTimeCol = header["SubmitTime"]!! + runtimeCol = header["RunTime"]!! + coreCol = header["NProcs"]!! + dependencyCol = header["Dependencies"]!! + return@forEachIndexed + } + + val workflowId = values[workflowIdCol].trim().toLong() + val taskId = values[taskIdCol].trim().toLong() + val submitTime = values[submitTimeCol].trim().toDouble() + val runtime = max(0, values[runtimeCol].trim().toLong()) + val cores = values[coreCol].trim().toInt() + val dependencies = values[dependencyCol].split(" ") + .filter { it.isNotEmpty() } + .map { it.trim().toLong() } + + val flops: Long = 4000 * runtime * cores + + val entry = entries.getOrPut(workflowId) { + TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "", UnnamedUser, HashSet())) + } + val workflow = entry.workload + val task = Task( + UUID(0L, taskId), "", + FlopsApplication(UUID(0L, taskId), "", workflow.owner, cores, flops), + HashSet() + ) + entry.submissionTime = min(entry.submissionTime, submitTime) + (workflow.tasks as MutableSet).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } + } finally { + reader.close() + } + + // Fix dependencies and dependents for all tasks + taskDependencies.forEach { (task, dependencies) -> + (task.dependencies as MutableSet).addAll(dependencies.map { taskId -> + tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") + }) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Instant, + override val workload: Job + ) : TraceEntry +} diff --git a/opendc-format-gwf/src/test/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReaderTest.kt b/opendc-format-gwf/src/test/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReaderTest.kt new file mode 100644 index 00000000..ca60f61d --- /dev/null +++ b/opendc-format-gwf/src/test/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReaderTest.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.gwf + +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test + +/** + * Test suite for the [GwfTraceReader] class. + */ +@DisplayName("GwfTraceReader") +internal class GwfTraceReaderTest { + @Test + fun `should open from InputStream`() { + val input = GwfTraceReaderTest::class.java.getResourceAsStream("/askalon_workload_olde.gwf") + val reader = GwfTraceReader(input) + reader.close() + } +} diff --git a/opendc-format-gwf/src/test/resources/trace.gwf b/opendc-format-gwf/src/test/resources/trace.gwf new file mode 100644 index 00000000..75b4c8d8 --- /dev/null +++ b/opendc-format-gwf/src/test/resources/trace.gwf @@ -0,0 +1,4 @@ +WorkflowID, JobID , SubmitTime, RunTime , NProcs , ReqNProcs , Dependencies +0 , 1 , 50 , 13 , 1 , 1 , +1 , 2 , 64 , 13 , 1 , 1 , +1 , 3 , 1821 , 12 , 1 , 1 , 2 diff --git a/opendc-format-sc18/build.gradle.kts b/opendc-format-sc18/build.gradle.kts new file mode 100644 index 00000000..b656f4e3 --- /dev/null +++ b/opendc-format-sc18/build.gradle.kts @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` +} + +/* Project configuration */ +repositories { + jcenter() +} + +val junitJupiterVersion: String by extra +val junitPlatformVersion: String by extra + +dependencies { + api(project(":opendc-core")) + api(project(":opendc-format")) + api(project(":opendc-workflows")) + api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") + implementation(kotlin("stdlib")) + implementation(kotlin("reflect")) + + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion") + testImplementation("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") +} diff --git a/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt b/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt new file mode 100644 index 00000000..4fbde269 --- /dev/null +++ b/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt @@ -0,0 +1,44 @@ +package com.atlarge.opendc.format.environment.sc18 + +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo + +/** + * A datacenter setup. + * + * @property name The name of the setup. + * @property rooms The rooms in the datacenter. + */ +data class Setup(val name: String, val rooms: List) + +/** + * A room in a datacenter. + * + * @property type The type of room in the datacenter. + * @property objects The objects in the room. + */ +data class Room(val type: String, val objects: List) + +/** + * An object in a [Room]. + * + * @property type The type of the room object. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) +sealed class RoomObject(val type: String) { + /** + * A rack in a server room. + * + * @property machines The machines in the rack. + */ + data class Rack(val machines: List) : RoomObject("RACK") +} + +/** + * A machine in the setup that consists of the specified CPU's represented as + * integer identifiers and ethernet speed. + * + * @property cpus The CPUs in the machine represented as integer identifiers. + */ +data class Machine(val cpus: List) diff --git a/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt new file mode 100644 index 00000000..40ed5a45 --- /dev/null +++ b/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -0,0 +1,95 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment.sc18 + +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.model.Cluster +import com.atlarge.opendc.model.Environment +import com.atlarge.opendc.model.Platform +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import com.atlarge.opendc.model.resources.compute.ProcessingUnit +import com.atlarge.opendc.model.resources.compute.host.Host +import com.atlarge.opendc.model.resources.compute.scheduling.SpaceSharedMachineScheduler +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import java.io.InputStream +import java.util.UUID + +/** + * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Datacenter + * Schedulers". + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { + /** + * The environment that was read from the file. + */ + private val environment: Environment + + init { + val setup = mapper.readValue(input) + val clusters = setup.rooms.mapIndexed { i, room -> + var counter = 0 + val hosts = room.objects.flatMap { roomObject -> + when (roomObject) { + is RoomObject.Rack -> { + roomObject.machines.map { machine -> + val cores = machine.cpus.flatMap { id -> + when (id) { + 1 -> List(4) { ProcessingElement(it, CPUS[0]) } + 2 -> List(2) { ProcessingElement(it, CPUS[1]) } + else -> throw IllegalArgumentException("The cpu id $id is not recognized") + } + } + Host(UUID.randomUUID(), "node-${counter++}", SpaceSharedMachineScheduler, cores) + } + } + } + } + Cluster(UUID.randomUUID(), "cluster-$i", hosts) + } + + val platform = Platform(UUID.randomUUID(), "sc18-platform", listOf( + Zone(UUID.randomUUID(), "zone", emptySet(), clusters) + )) + + environment = Environment(setup.name, null, listOf(platform)) + } + + override fun read(): Environment = environment + + override fun close() {} + + companion object { + val CPUS = arrayOf( + ProcessingUnit("Intel", 6, 6920, "Intel(R) Core(TM) i7-6920HQ CPU @ 4.10GHz", 4100.0, 1), + ProcessingUnit("Intel", 6, 6930, "Intel(R) Core(TM) i7-6920HQ CPU @ 3.50GHz", 3500.0, 1) + ) + } +} diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts new file mode 100644 index 00000000..68f9aa5d --- /dev/null +++ b/opendc-format/build.gradle.kts @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` +} + +/* Project configuration */ +repositories { + jcenter() +} + +val junitJupiterVersion: String by extra +val junitPlatformVersion: String by extra + +dependencies { + api(project(":odcsim-core")) + api(project(":opendc-core")) + + implementation(kotlin("stdlib")) + + testImplementation(project(":odcsim-testkit")) + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion") + testImplementation("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") + testRuntimeOnly("org.slf4j:slf4j-simple:1.7.25") + testImplementation("com.nhaarman.mockitokotlin2:mockito-kotlin:2.0.0") +} diff --git a/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt new file mode 100644 index 00000000..0ba3ae25 --- /dev/null +++ b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment + +import com.atlarge.opendc.model.Environment +import java.io.Closeable + +/** + * An interface for reading descriptions of datacenter environments into memory as [Environment]. + */ +interface EnvironmentReader : Closeable { + /** + * Read the description of the datacenter environment as [Environment]. + */ + fun read(): Environment +} diff --git a/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt new file mode 100644 index 00000000..cf0ab526 --- /dev/null +++ b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.odcsim.Instant +import com.atlarge.opendc.model.workload.Workload + +/** + * An entry in a workload trace. + * + * @param T The shape of the workload in this entry. + */ +interface TraceEntry { + /** + * The time of submission of the workload. + */ + val submissionTime: Instant + + /** + * The workload in this trace entry. + */ + val workload: T + + /** + * Extract the submission time from this entry. + */ + operator fun component1() = submissionTime + + /** + * Extract the workload from this entry. + */ + operator fun component2() = workload +} diff --git a/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt new file mode 100644 index 00000000..af8b272d --- /dev/null +++ b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.opendc.model.workload.Workload +import java.io.Closeable + +/** + * An interface for reading [Workload]s into memory. + * + * This interface must guarantee that the entries are delivered in order of submission time. + * + * @param T The shape of the workloads supported by this reader. + */ +interface TraceReader : Iterator>, Closeable diff --git a/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt new file mode 100644 index 00000000..b5424fd2 --- /dev/null +++ b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt @@ -0,0 +1,46 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.odcsim.Instant +import com.atlarge.opendc.model.workload.Workload +import java.io.Closeable + +/** + * An interface for persisting workload traces (e.g. to disk). + * + * @param T The type of [Workload] supported by this writer. + */ +interface TraceWriter : Closeable { + /** + * Write an entry to the trace. + * + * Entries must be written in order of submission time. Failing to do so results in a [IllegalArgumentException]. + * + * @param submissionTime The time of submission of the workload. + * @param workload The workload to write to the trace. + */ + fun write(submissionTime: Instant, workload: T) +} diff --git a/opendc-workflows/build.gradle.kts b/opendc-workflows/build.gradle.kts new file mode 100644 index 00000000..68f9aa5d --- /dev/null +++ b/opendc-workflows/build.gradle.kts @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` +} + +/* Project configuration */ +repositories { + jcenter() +} + +val junitJupiterVersion: String by extra +val junitPlatformVersion: String by extra + +dependencies { + api(project(":odcsim-core")) + api(project(":opendc-core")) + + implementation(kotlin("stdlib")) + + testImplementation(project(":odcsim-testkit")) + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion") + testImplementation("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") + testRuntimeOnly("org.slf4j:slf4j-simple:1.7.25") + testImplementation("com.nhaarman.mockitokotlin2:mockito-kotlin:2.0.0") +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt new file mode 100644 index 00000000..45f3c4b0 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.model.services.workflows.stages.job.JobAdmissionPolicy +import com.atlarge.opendc.model.services.workflows.stages.job.JobSortingPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceDynamicFilterPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceSelectionPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.TaskEligibilityPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.TaskSortingPolicy + +/** + * A [WorkflowScheduler] that distributes work through a multi-stage process based on the Reference Architecture for + * Datacenter Scheduling. + */ +class StageWorkflowScheduler( + private val mode: WorkflowSchedulerMode, + private val jobAdmissionPolicy: JobAdmissionPolicy, + private val jobSortingPolicy: JobSortingPolicy, + private val taskEligibilityPolicy: TaskEligibilityPolicy, + private val taskSortingPolicy: TaskSortingPolicy, + private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, + private val resourceSelectionPolicy: ResourceSelectionPolicy +) : WorkflowScheduler { + override fun invoke( + ctx: ActorContext, + timers: TimerScheduler, + lease: ProvisioningResponse.Lease + ): WorkflowSchedulerLogic { + return StageWorkflowSchedulerLogic(ctx, timers, lease, mode, jobAdmissionPolicy, + jobSortingPolicy, taskEligibilityPolicy, taskSortingPolicy, resourceDynamicFilterPolicy, resourceSelectionPolicy) + } +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt new file mode 100644 index 00000000..9d5f4bea --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt @@ -0,0 +1,248 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.odcsim.unsafeCast +import com.atlarge.opendc.model.resources.compute.MachineMessage +import com.atlarge.opendc.model.resources.compute.MachineRef +import com.atlarge.opendc.model.resources.compute.scheduling.ProcessState +import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.workflows.stages.job.JobAdmissionPolicy +import com.atlarge.opendc.model.services.workflows.stages.job.JobSortingPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceDynamicFilterPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceSelectionPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.TaskEligibilityPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.TaskSortingPolicy +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.Pid +import com.atlarge.opendc.model.workload.workflow.Job +import com.atlarge.opendc.model.workload.workflow.Task + +/** + * Logic of the [StageWorkflowScheduler]. + */ +class StageWorkflowSchedulerLogic( + ctx: ActorContext, + timers: TimerScheduler, + lease: ProvisioningResponse.Lease, + private val mode: WorkflowSchedulerMode, + private val jobAdmissionPolicy: JobAdmissionPolicy, + private val jobSortingPolicy: JobSortingPolicy, + private val taskEligibilityPolicy: TaskEligibilityPolicy, + private val taskSortingPolicy: TaskSortingPolicy, + private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, + private val resourceSelectionPolicy: ResourceSelectionPolicy +) : WorkflowSchedulerLogic(ctx, timers, lease) { + + /** + * The incoming jobs ready to be processed by the scheduler. + */ + internal val incomingJobs: MutableSet = mutableSetOf() + + /** + * The active jobs in the system. + */ + internal val activeJobs: MutableSet = mutableSetOf() + + /** + * The running tasks by [Pid]. + */ + internal val taskByPid = mutableMapOf() + + /** + * The available processor cores on the leased machines. + */ + internal val machineCores: MutableMap = HashMap() + + init { + lease.hosts.forEach { machineCores[it] = it.host.cores.count() } + } + + override fun submit(job: Job, handler: ActorRef) { + // J1 Incoming Jobs + val jobInstance = JobView(job, handler) + val instances = job.tasks.associateWith { + TaskView(jobInstance, it) + } + + for ((task, instance) in instances) { + instance.dependencies.addAll(task.dependencies.map { instances[it]!! }) + task.dependencies.forEach { + instances[it]!!.dependents.add(instance) + } + + // If the task has no dependency, it is a root task and can immediately be evaluated + if (instance.isRoot) { + instance.state = ProcessState.READY + } + } + + jobInstance.tasks = instances.values.toMutableSet() + incomingJobs += jobInstance + ctx.send(handler, WorkflowEvent.JobSubmitted(ctx.self, job, ctx.time)) + requestCycle() + } + + /** + * Indicate to the scheduler that a scheduling cycle is needed. + */ + private fun requestCycle() { + when (mode) { + is WorkflowSchedulerMode.Interactive -> { + schedule() + } + is WorkflowSchedulerMode.Batch -> { + timers.after(mode, mode.quantum) { + schedule() + } + } + } + } + + /** + * Perform a scheduling cycle immediately. + */ + override fun schedule() { + // J2 Create list of eligible jobs + jobAdmissionPolicy.startCycle(this) + val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) } + for (jobInstance in eligibleJobs) { + incomingJobs -= jobInstance + activeJobs += jobInstance + ctx.send(jobInstance.broker, WorkflowEvent.JobStarted(ctx.self, jobInstance.job, ctx.time)) + } + + // J3 Sort jobs on criterion + val sortedJobs = jobSortingPolicy(this, activeJobs) + + // J4 Per job + for (jobInstance in sortedJobs) { + // T1 Create list of eligible tasks + taskEligibilityPolicy.startCycle(this) + val eligibleTasks = jobInstance.tasks.filter { taskEligibilityPolicy.isEligible(this, it) } + + // T2 Sort tasks on criterion + val sortedTasks = taskSortingPolicy(this, eligibleTasks) + + // T3 Per task + for (instance in sortedTasks) { + val hosts = resourceDynamicFilterPolicy(this, lease.hosts, instance) + val host = resourceSelectionPolicy.select(this, hosts, instance) + + if (host != null) { + // T4 Submit task to machine + ctx.send(host.ref, MachineMessage.Submit(instance.task.application, instance, ctx.self.unsafeCast())) + instance.host = host + instance.state = ProcessState.RUNNING // Assume the application starts running + machineCores.merge(host, instance.task.application.cores, Int::minus) + } else { + return + } + } + } + } + + override fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) { + val task = key as TaskView + task.pid = pid + taskByPid[pid] = task + ctx.send(task.job.broker, WorkflowEvent.TaskStarted(ctx.self, task.job.job, task.task, ctx.time)) + } + + override fun onTermination(instance: MachineRef, pid: Pid, status: Int) { + val task = taskByPid.remove(pid)!! + val job = task.job + task.state = ProcessState.TERMINATED + job.tasks.remove(task) + machineCores.merge(task.host!!, task.task.application.cores, Int::plus) + ctx.send(job.broker, WorkflowEvent.TaskFinished(ctx.self, job.job, task.task, status, ctx.time)) + + if (job.isFinished) { + activeJobs -= job + ctx.send(job.broker, WorkflowEvent.JobFinished(ctx.self, job.job, ctx.time)) + } + + requestCycle() + } + + class JobView(val job: Job, val broker: ActorRef) { + /** + * A flag to indicate whether this job is finished. + */ + val isFinished: Boolean + get() = tasks.isEmpty() + + lateinit var tasks: MutableSet + } + + class TaskView(val job: JobView, val task: Task) { + /** + * The dependencies of this task. + */ + val dependencies = HashSet() + + /** + * The dependents of this task. + */ + val dependents = HashSet() + + /** + * A flag to indicate whether this workflow task instance is a workflow root. + */ + val isRoot: Boolean + get() = dependencies.isEmpty() + + var state: ProcessState = ProcessState.CREATED + set(value) { + field = value + + // Mark the process as terminated in the graph + if (value == ProcessState.TERMINATED) { + markTerminated() + } + } + + var pid: Pid? = null + + var host: HostView? = null + + /** + * Mark the specified [TaskView] as terminated. + */ + private fun markTerminated() { + for (dependent in dependents) { + dependent.dependencies.remove(this) + + if (dependent.isRoot) { + dependent.state = ProcessState.READY + } + } + } + } +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt new file mode 100644 index 00000000..c81085d4 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse + +/** + * A factory interface for constructing a [WorkflowSchedulerLogic]. + */ +interface WorkflowScheduler { + /** + * Construct a [WorkflowSchedulerLogic] in the given [ActorContext]. + * + * @param ctx The context in which the scheduler runs. + * @param timers The timer scheduler to use. + * @param lease The resource lease to use. + */ + operator fun invoke( + ctx: ActorContext, + timers: TimerScheduler, + lease: ProvisioningResponse.Lease + ): WorkflowSchedulerLogic +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt new file mode 100644 index 00000000..09cb0ef9 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.opendc.model.resources.compute.scheduling.ProcessObserver +import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.model.workload.workflow.Job + +/** + * A workflow scheduler interface that schedules jobs across machines. + * + * @property ctx The context in which the scheduler runs. + * @property timers The timer scheduler to use. + * @property lease The resource lease to use. + */ +abstract class WorkflowSchedulerLogic( + protected val ctx: ActorContext, + protected val timers: TimerScheduler, + protected val lease: ProvisioningResponse.Lease +) : ProcessObserver { + /** + * Submit the specified workflow for scheduling. + */ + abstract fun submit(job: Job, handler: ActorRef) + + /** + * Trigger an immediate scheduling cycle. + */ + abstract fun schedule() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt new file mode 100644 index 00000000..0a4b40e5 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.Duration + +/** + * The operating mode of a workflow scheduler. + */ +sealed class WorkflowSchedulerMode { + /** + * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. + */ + object Interactive : WorkflowSchedulerMode() + + /** + * A batch scheduler triggers a scheduling cycle every time quantum if needed. + */ + data class Batch(val quantum: Duration) : WorkflowSchedulerMode() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt new file mode 100644 index 00000000..72397203 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt @@ -0,0 +1,193 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.odcsim.coroutines.actorContext +import com.atlarge.odcsim.coroutines.dsl.ask +import com.atlarge.odcsim.coroutines.suspending +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.unhandled +import com.atlarge.odcsim.withTimers +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.ZoneRef +import com.atlarge.opendc.model.find +import com.atlarge.opendc.model.resources.compute.MachineEvent +import com.atlarge.opendc.model.services.AbstractService +import com.atlarge.opendc.model.services.Service +import com.atlarge.opendc.model.services.ServiceProvider +import com.atlarge.opendc.model.services.provisioning.ProvisioningMessage +import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.model.services.provisioning.ProvisioningService +import com.atlarge.opendc.model.workload.workflow.Job +import com.atlarge.opendc.model.workload.workflow.Task +import java.util.UUID + +/** + * A service for cloud workflow management. + * + * The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al. + */ +class WorkflowService(val scheduler: WorkflowScheduler) : ServiceProvider { + override val uid: UUID = UUID.randomUUID() + override val name: String = "workflows" + override val provides: Set> = setOf(WorkflowService) + override val dependencies: Set> = setOf(ProvisioningService) + + /** + * Build the runtime [Behavior] for the workflow service, responding to messages of shape [WorkflowMessage]. + */ + override fun invoke(zone: Zone, ref: ZoneRef): Behavior = suspending { ctx -> + val provisioner = ref.find(ProvisioningService) + // Wait for 0.1 sec before asking the provisioner to allow it to initialize. Will return empty response if asked + // immediately. + val lease: ProvisioningResponse.Lease = actorContext().ask(provisioner, after = 0.1) { ProvisioningMessage.Request(Int.MAX_VALUE, it) } + + withTimers { timers -> + @Suppress("UNCHECKED_CAST") + val schedulerLogic = scheduler(ctx, timers as TimerScheduler, lease) + + receiveMessage { msg -> + when (msg) { + is WorkflowMessage.Submit -> { + schedulerLogic.submit(msg.job, msg.broker) + same() + } + is MachineEvent.Submitted -> { + schedulerLogic.onSubmission(msg.instance, msg.application, msg.key, msg.pid) + same() + } + is MachineEvent.Terminated -> { + schedulerLogic.onTermination(msg.instance, msg.pid, msg.status) + same() + } + else -> + unhandled() + } + } + }.narrow() + } + + companion object : AbstractService(UUID.randomUUID(), "workflows") +} + +/** + * A reference to the workflow service instance. + */ +typealias WorkflowServiceRef = ActorRef + +/** + * A message protocol for communicating to the workflow service. + */ +sealed class WorkflowMessage { + /** + * Submit the specified [Job] to the workflow service for scheduling. + * + * @property job The workflow to submit for scheduling. + * @property broker The broker that has submitted this workflow on behalf of a user and that needs to be kept + * up-to-date. + */ + data class Submit(val job: Job, val broker: ActorRef) : WorkflowMessage() +} + +/** + * A message protocol used by the workflow service to respond to [WorkflowMessage]s. + */ +sealed class WorkflowEvent { + /** + * Indicate that the specified [Job] was submitted to the workflow service. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that has been submitted. + * @property time A timestamp of the moment the job was received. + */ + data class JobSubmitted( + val service: WorkflowServiceRef, + val job: Job, + val time: Instant + ) : WorkflowEvent() + + /** + * Indicate that the specified [Job] has become active. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that has been submitted. + * @property time A timestamp of the moment the job started. + */ + data class JobStarted( + val service: WorkflowServiceRef, + val job: Job, + val time: Instant + ) : WorkflowEvent() + + /** + * Indicate that the specified [Task] has started processing. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that contains this task. + * @property task The task that has started processing. + * @property time A timestamp of the moment the task started. + */ + data class TaskStarted( + val service: WorkflowServiceRef, + val job: Job, + val task: Task, + val time: Instant + ) : WorkflowEvent() + + /** + * Indicate that the specified [Task] has started processing. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that contains this task. + * @property task The task that has started processing. + * @property status The exit code of the task, where zero means successful. + * @property time A timestamp of the moment the task finished. + */ + data class TaskFinished( + val service: WorkflowServiceRef, + val job: Job, + val task: Task, + val status: Int, + val time: Instant + ) : WorkflowEvent() + + /** + * Indicate that the specified [Job] has finished processing. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that has finished processing. + * @property time A timestamp of the moment the task finished. + */ + data class JobFinished( + val service: WorkflowServiceRef, + val job: Job, + val time: Instant + ) : WorkflowEvent() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt new file mode 100644 index 00000000..c58d2210 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.job + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue. + */ +class FifoJobSortingPolicy : JobSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + jobs: Collection + ): List = jobs.toList() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt new file mode 100644 index 00000000..be60fa9b --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.job + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A policy interface for admitting [StageWorkflowSchedulerLogic.JobView]s to a scheduling cycle. + */ +interface JobAdmissionPolicy { + /** + * A method that is invoked at the start of each scheduling cycle. + * + * @param scheduler The scheduler that started the cycle. + */ + fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + + /** + * Determine whether the specified [StageWorkflowSchedulerLogic.JobView] should be admitted to the scheduling cycle. + * + * @param scheduler The scheduler that should admit or reject the job. + * @param job The workflow that has been submitted. + * @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise. + */ + fun shouldAdmit(scheduler: StageWorkflowSchedulerLogic, job: StageWorkflowSchedulerLogic.JobView): Boolean +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt new file mode 100644 index 00000000..3af88aa7 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.job + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A policy interface for ordering admitted workflows in the scheduling queue. + */ +interface JobSortingPolicy { + /** + * Sort the given collection of jobs on a given criterion. + * + * @param scheduler The scheduler that started the cycle. + * @param jobs The collection of tasks that should be sorted. + * @return The sorted list of jobs. + */ + operator fun invoke( + scheduler: StageWorkflowSchedulerLogic, + jobs: Collection + ): List +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt new file mode 100644 index 00000000..5436a1a1 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.job + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A [JobAdmissionPolicy] that admits all jobs. + */ +object NullJobAdmissionPolicy : JobAdmissionPolicy { + /** + * Admit every submitted job. + */ + override fun shouldAdmit( + scheduler: StageWorkflowSchedulerLogic, + job: StageWorkflowSchedulerLogic.JobView + ): Boolean = true +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt new file mode 100644 index 00000000..7da59692 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.job + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic +import kotlin.random.Random + +/** + * The [RandomJobSortingPolicy] sorts tasks randomly. + * + * @property random The [Random] instance to use when sorting the list of tasks. + */ +class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + jobs: Collection + ): List = jobs.shuffled(random) +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt new file mode 100644 index 00000000..afaf075d --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.resources + +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A [ResourceSelectionPolicy] that selects the first machine that is available. + */ +class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { + override fun select( + scheduler: StageWorkflowSchedulerLogic, + machines: List, + task: StageWorkflowSchedulerLogic.TaskView + ): HostView? = + machines.firstOrNull() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt new file mode 100644 index 00000000..3f28a040 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.resources + +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for + * the task. + */ +class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + machines: List, + task: StageWorkflowSchedulerLogic.TaskView + ): List { + return machines + .filter { scheduler.machineCores[it] ?: 0 >= task.task.application.cores } + } +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt new file mode 100644 index 00000000..f73c0d9e --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.resources + +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding + * a list of resources with sufficient resource-capacities, based on fixed or dynamic requirements, and on predicted or + * monitored information about processing unit availability, memory occupancy, etc. + */ +interface ResourceDynamicFilterPolicy { + /** + * Filter the list of machines based on dynamic information. + * + * @param scheduler The scheduler to filter the machines. + * @param machines The list of machines in the system. + * @param task The task that is to be scheduled. + * @return The machines on which the task can be scheduled. + */ + operator fun invoke( + scheduler: StageWorkflowSchedulerLogic, + machines: List, + task: StageWorkflowSchedulerLogic.TaskView + ): List +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt new file mode 100644 index 00000000..a9172a53 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.resources + +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected + * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit. + */ +interface ResourceSelectionPolicy { + /** + * Select a machine on which the task should be scheduled. + * + * @param scheduler The scheduler to select the machine. + * @param machines The list of machines in the system. + * @param task The task that is to be scheduled. + * @return The selected machine or `null` if no machine could be found. + */ + fun select( + scheduler: StageWorkflowSchedulerLogic, + machines: List, + task: StageWorkflowSchedulerLogic.TaskView + ): HostView? +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt new file mode 100644 index 00000000..2eb2f6fb --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.task + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue. + */ +class FifoTaskSortingPolicy : TaskSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + tasks: Collection + ): List = tasks.toList() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt new file mode 100644 index 00000000..2e7cc8c1 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.task + +import com.atlarge.opendc.model.resources.compute.scheduling.ProcessState +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job. + */ +class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy { + override fun isEligible( + scheduler: StageWorkflowSchedulerLogic, + task: StageWorkflowSchedulerLogic.TaskView + ): Boolean = task.state == ProcessState.READY +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt new file mode 100644 index 00000000..69462e41 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.task + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic +import kotlin.random.Random + +/** + * The [RandomTaskSortingPolicy] sorts tasks randomly. + * + * @property random The [Random] instance to use when sorting the list of tasks. + */ +class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + tasks: Collection + ): List = tasks.shuffled(random) +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt new file mode 100644 index 00000000..c3c7e725 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.task + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A policy interface for determining the eligibility of tasks in a scheduling cycle. + */ +interface TaskEligibilityPolicy { + /** + * A method that is invoked at the start of each scheduling cycle. + * + * @param scheduler The scheduler that started the cycle. + */ + fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + + /** + * Determine whether the specified [StageWorkflowSchedulerLogic.TaskView] is eligible to be scheduled. + * + * @param scheduler The scheduler that is determining whether the task is eligible. + * @param task The task instance to schedule. + * @return `true` if the task eligible to be scheduled, `false` otherwise. + */ + fun isEligible(scheduler: StageWorkflowSchedulerLogic, task: StageWorkflowSchedulerLogic.TaskView): Boolean +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt new file mode 100644 index 00000000..3f296d0e --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.task + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the + * scheduler with a sorted list of tasks to schedule. + */ +interface TaskSortingPolicy { + /** + * Sort the given list of tasks on a given criterion. + * + * @param scheduler The scheduler that is sorting the tasks. + * @param tasks The collection of tasks that should be sorted. + * @return The sorted list of tasks. + */ + operator fun invoke( + scheduler: StageWorkflowSchedulerLogic, + tasks: Collection + ): List +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt new file mode 100644 index 00000000..dd72cf6d --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.workflow + +import com.atlarge.opendc.model.User +import com.atlarge.opendc.model.workload.Workload +import java.util.UUID + +/** + * A workload that represents a directed acyclic graph (DAG) of tasks with control and data dependencies between tasks. + * + * @property uid A unique identified of this workflow. + * @property name The name of this workflow. + * @property owner The owner of the workflow. + * @property tasks The tasks that are part of this workflow. + */ +data class Job( + override val uid: UUID, + override val name: String, + override val owner: User, + val tasks: Set +) : Workload { + override fun equals(other: Any?): Boolean = other is Job && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt new file mode 100644 index 00000000..0cc3fa0e --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.workflow + +import com.atlarge.opendc.model.Identity +import com.atlarge.opendc.model.workload.application.Application +import java.util.UUID + +/** + * A stage of a [Job]. + * + * @property uid A unique identified of this task. + * @property name The name of this task. + * @property application The application to run as part of this workflow task. + * @property dependencies The dependencies of this task in order for it to execute. + */ +data class Task( + override val uid: UUID, + override val name: String, + val application: Application, + val dependencies: Set +) : Identity { + override fun equals(other: Any?): Boolean = other is Task && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/settings.gradle.kts b/settings.gradle.kts index a9b2ea83..085885ba 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -27,3 +27,9 @@ include(":odcsim-core") include(":odcsim-engine-tests") include(":odcsim-engine-omega") include(":odcsim-testkit") +include(":opendc-core") +include(":opendc-experiments-tpds") +include(":opendc-format") +include(":opendc-format-gwf") +include(":opendc-format-sc18") +include(":opendc-workflows") -- cgit v1.2.3