diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-11 13:53:00 +0100 |
|---|---|---|
| committer | Georgios Andreadis <info@gandreadis.com> | 2020-02-11 14:40:15 +0100 |
| commit | 30d03df110bd0f2f805eaf89026660926929fa38 (patch) | |
| tree | 7e4d56085ea6536cb6a1d22aed8c5fd46bf5540c /opendc/opendc-core | |
| parent | 5de6ec076fa8bc19c34449bcc085dca184d2e17f (diff) | |
refactor: Reimplement OpenDC model using 2.x API
Diffstat (limited to 'opendc/opendc-core')
34 files changed, 2380 insertions, 0 deletions
diff --git a/opendc/opendc-core/build.gradle.kts b/opendc/opendc-core/build.gradle.kts new file mode 100644 index 00000000..0ac1f1ea --- /dev/null +++ b/opendc/opendc-core/build.gradle.kts @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Base model for datacenter simulation" + +/* Build configuration */ +plugins { + `kotlin-library-convention` +} + +dependencies { + implementation(kotlin("stdlib")) + api(project(":odcsim:odcsim-api")) + + testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") + testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Broker.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Broker.kt new file mode 100644 index 00000000..a3d6b0a7 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Broker.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef + +/** + * A broker acting on the various cloud platforms on behalf of the user. + */ +interface Broker { + /** + * Build the runtime behavior of the [Broker]. + * + * @param platforms A list of available cloud platforms. + * @return The runtime behavior of the broker. + */ + suspend operator fun invoke(ctx: ProcessContext, platforms: List<SendRef<PlatformMessage>>) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Cluster.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Cluster.kt new file mode 100644 index 00000000..da9aed00 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Cluster.kt @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.resources.compute.MachineMessage +import com.atlarge.opendc.core.resources.compute.host.Host +import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent +import java.util.UUID + +/** + * A logical grouping of heterogeneous hosts and primary storage within a zone. + * + * @property uid The unique identifier of the cluster. + * @property name The name of this cluster. + * @property hosts The machines in this cluster. + */ +data class Cluster(override val uid: UUID, override val name: String, val hosts: List<Host>) : Identity { + /** + * Build the runtime [Behavior] of this cluster of hosts. + * + * @param manager The manager of the cluster. + */ + suspend operator fun invoke(ctx: ProcessContext, manager: SendRef<MachineSupervisionEvent>) { + // Launch all hosts in the cluster + for (host in hosts) { + val channel = ctx.open<MachineMessage>() + ctx.spawn({ host(it, manager, channel) }, name = host.name) + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt new file mode 100644 index 00000000..5bdff0b6 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Environment.kt @@ -0,0 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +/** + * A description of a large-scale computing environment. This description includes including key size and topology + * information of the environment, types of resources, but also various operational and management rules such as + * scheduled maintenance, allocation and other constraints. + * + * @property name The name of the environment. + * @property description A small textual description about the environment that is being modeled + * @property platforms The cloud platforms (such as AWS or GCE) in this environment. + */ +data class Environment(val name: String, val description: String?, val platforms: List<Platform>) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Identity.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Identity.kt new file mode 100644 index 00000000..c87e934f --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Identity.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import java.util.UUID + +/** + * An object that has a unique identity. + */ +interface Identity { + /** + * A unique, opaque, system-generated value, representing the object. + */ + val uid: UUID + + /** + * A non-empty, human-readable string representing the object. + */ + val name: String +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Model.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Model.kt new file mode 100644 index 00000000..3d16c4b2 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Model.kt @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import com.atlarge.odcsim.ProcessContext + +/** + * A simulation model for large-scale simulation of datacenter infrastructure, built with the *odcsim* API. + * + * @property environment The environment in which brokers operate. + * @property brokers The brokers acting on the cloud platforms. + */ +data class Model(val environment: Environment, val brokers: List<Broker>) { + /** + * Build the runtime behavior of the universe. + */ + suspend operator fun invoke(ctx: ProcessContext) { + // Setup the environment + val platforms = environment.platforms.map { platform -> + val channel = ctx.open<PlatformMessage>() + ctx.spawn({ platform(it, channel.receive) }, name = platform.name) + channel.send + } + + for (broker in brokers) { + ctx.spawn { broker(it, platforms) } + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt new file mode 100644 index 00000000..fab67962 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt @@ -0,0 +1,103 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.ask +import com.atlarge.odcsim.sendOnce +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * A representation of a cloud platform such as Amazon Web Services (AWS), Microsoft Azure or Google Cloud. + * + * @property uid The unique identifier of this datacenter. + * @property name the name of the platform. + * @property zones The availability zones available on this platform. + */ +data class Platform(override val uid: UUID, override val name: String, val zones: List<Zone>) : Identity { + /** + * Build the runtime [Behavior] of this cloud platform. + */ + suspend operator fun invoke(ctx: ProcessContext, main: ReceiveRef<PlatformMessage>) { + println("Starting cloud platform $name [$uid] with ${zones.size} zones") + + // Launch all zones of the cloud platform + val zoneInstances = zones.associateWith { zone -> + val channel = ctx.open<ZoneMessage>() + ctx.spawn({ zone(it, channel) }, name = zone.name) + channel.send + } + + val inlet = ctx.listen(main) + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is PlatformMessage.ListZones -> { + msg.replyTo.sendOnce(PlatformResponse.Zones(this@Platform, zoneInstances.mapKeys { it.key.name })) + } + } + } + } + inlet.close() + } +} + +/** + * A message protocol for communicating with a cloud platform. + */ +sealed class PlatformMessage { + /** + * Request the available zones on this platform. + * + * @property replyTo The actor address to send the response to. + */ + data class ListZones(val replyTo: SendRef<PlatformResponse.Zones>) : PlatformMessage() +} + +/** + * A message protocol used by platform actors to respond to [PlatformMessage]s. + */ +sealed class PlatformResponse { + /** + * The zones available on this cloud platform. + * + * @property platform The reference to the cloud platform these are the zones of. + * @property zones The zones in this cloud platform. + */ + data class Zones(val platform: Platform, val zones: Map<String, SendRef<ZoneMessage>>) : PlatformResponse() +} + +/** + * Retrieve the available zones of a platform. + */ +suspend fun SendRef<PlatformMessage>.zones(): Map<String, SendRef<ZoneMessage>> { + val zones: PlatformResponse.Zones = ask { PlatformMessage.ListZones(it) } + return zones.zones +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/User.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/User.kt new file mode 100644 index 00000000..6105ae9e --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/User.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +/** + * A user of the cloud network. + */ +interface User : Identity { + /** + * The name of the user. + */ + override val name: String +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt new file mode 100644 index 00000000..07361423 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt @@ -0,0 +1,194 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.ask +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.services.Service +import com.atlarge.opendc.core.services.ServiceProvider +import java.util.ArrayDeque +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * An isolated location within a datacenter region from which public cloud services operate, roughly equivalent to a + * single datacenter. Zones contain one or more clusters and secondary storage. + * + * This class models *only* the static information of a zone, with dynamic information being contained within the zone's + * actor. During runtime, it's actor acts as a registry for all the cloud services provided by the zone. + * + * @property uid The unique identifier of this availability zone + * @property name The name of the zone within its platform. + * @property services The initial set of services provided by the zone. + * @property clusters The clusters of machines in this zone. + */ +data class Zone( + override val uid: UUID, + override val name: String, + val services: Set<ServiceProvider>, + val clusters: List<Cluster> +) : Identity { + /** + * Build the runtime [Behavior] of this datacenter. + */ + suspend operator fun invoke(ctx: ProcessContext, main: Channel<ZoneMessage>) { + println("Starting zone $name [$uid]") + + // Launch all services of the zone + val instances: MutableMap<Service<*>, SendRef<*>> = mutableMapOf() + validateDependencies(services) + + for (provider in services) { + val channel = ctx.open<Any>() + println("Spawning service ${provider.name}") + ctx.spawn({ provider(it, this, main.send, channel) }, name = "${provider.name}-${provider.uid}") + provider.provides.forEach { instances[it] = channel.send } + } + + val inlet = ctx.listen(main.receive) + + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is ZoneMessage.Find -> { + msg.replyTo.sendOnce(ZoneResponse.Listing(this@Zone, msg.key, instances[msg.key])) + } + } + } + } + } + + /** + * Validate the service for unsatisfiable dependencies. + */ + private fun validateDependencies(providers: Set<ServiceProvider>) { + val providersByKey = HashMap<Service<*>, ServiceProvider>() + for (provider in providers) { + if (provider.provides.isEmpty()) { + throw IllegalArgumentException(("Service provider $provider does not provide any service.")) + } + for (key in provider.provides) { + if (key in providersByKey) { + throw IllegalArgumentException("Multiple providers for service $key") + } + providersByKey[key] = provider + } + } + + val visited = HashSet<ServiceProvider>() + val queue = ArrayDeque(providers) + while (queue.isNotEmpty()) { + val service = queue.poll() + visited.add(service) + + for (dependencyKey in service.dependencies) { + val dependency = providersByKey[dependencyKey] + ?: throw IllegalArgumentException("Dependency $dependencyKey not satisfied for service $service") + if (dependency !in visited) { + queue.add(dependency) + } + } + } + } + + override fun equals(other: Any?): Boolean = other is Zone && uid == other.uid + override fun hashCode(): Int = uid.hashCode() +} + +/** + * A message protocol for communicating with the service registry + */ +sealed class ZoneMessage { + /** + * Lookup the specified service in this availability zone. + * + * @property key The key of the service to lookup. + * @property replyTo The address to reply to. + */ + data class Find( + val key: Service<*>, + val replyTo: SendRef<ZoneResponse.Listing> + ) : ZoneMessage() +} + +/** + * A message protocol used by service registry actors to respond to [ZoneMessage]s. + */ +sealed class ZoneResponse { + /** + * The response sent when looking up services in a zone. + * + * @property zone The zone from which the response originates. + * @property key The key of the service that was looked up. + * @property ref The reference to the service or `null` if it is not present in the zone. + */ + data class Listing( + val zone: Zone, + val key: Service<*>, + private val ref: SendRef<*>? + ) : ZoneResponse() { + /** + * A flag to indicate whether the service is present. + */ + val isPresent: Boolean + get() = ref != null + + /** + * Determine whether this listing is for the specified key. + * + * @param key The key to check for. + * @return `true` if the listing is for this key, `false` otherwise. + */ + fun isForKey(key: Service<*>): Boolean = key == this.key + + /** + * Extract the result from the service lookup. + * + * @param key The key of the lookup. + * @return The reference to the service or `null` if it is not present in the zone. + */ + operator fun <T : Any> invoke(key: Service<T>): SendRef<T>? { + require(this.key == key) { "Invalid key" } + @Suppress("UNCHECKED_CAST") + return ref as? SendRef<T> + } + } +} + +/** + * Find the reference to the specified [ServiceProvider]. + * + * @param key The key of the service to find. + * @throws IllegalArgumentException if the service is not found. + */ +suspend fun <T : Any> SendRef<ZoneMessage>.find(key: Service<T>): SendRef<T> { + val listing: ZoneResponse.Listing = ask { ZoneMessage.Find(key, it) } + return listing(key) ?: throw IllegalArgumentException("Unknown key $key") +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/Machine.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/Machine.kt new file mode 100644 index 00000000..f25fa3cc --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/Machine.kt @@ -0,0 +1,107 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.Identity +import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.Pid + +/** + * A generic representation of a compute node (either physical or virtual) that is able to run [Application]s. + */ +interface Machine : Identity { + /** + * The details of the machine in key/value pairs. + */ + val details: Map<String, Any> + + /** + * Build the runtime [Behavior] of this compute resource, accepting messages of [MachineMessage]. + * + * @param supervisor The supervisor of the machine. + */ + suspend operator fun invoke(ctx: ProcessContext, supervisor: SendRef<MachineSupervisionEvent>, main: Channel<MachineMessage>) +} + +/** + * A reference to a machine instance that accepts messages of type [MachineMessage]. + */ +typealias MachineRef = SendRef<MachineMessage> + +/** + * A message protocol for communicating with machine instances. + */ +sealed class MachineMessage { + /** + * Launch the specified [Application] on the machine instance. + * + * @property application The application to submit. + * @property key The key to identify this submission. + * @property broker The broker of the process to spawn. + */ + data class Submit( + val application: Application, + val key: Any, + val broker: SendRef<MachineEvent> + ) : MachineMessage() +} + +/** + * A message protocol used by machine instances to respond to [MachineMessage]s. + */ +sealed class MachineEvent { + /** + * Indicate that an [Application] was spawned on a machine instance. + * + * @property instance The machine instance to which the application was submitted. + * @property application The application that has been submitted. + * @property key The key used to identify the submission. + * @property pid The spawned application instance. + */ + data class Submitted( + val instance: MachineRef, + val application: Application, + val key: Any, + val pid: Pid + ) : MachineEvent() + + /** + * Indicate that an [Application] has terminated on the specified machine. + * + * @property instance The machine instance to which the application was submitted. + * @property pid The reference to the application instance that has terminated. + * @property status The exit code of the task, where zero means successful. + */ + data class Terminated( + val instance: MachineRef, + val pid: Pid, + val status: Int = 0 + ) : MachineEvent() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/MachineStatus.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/MachineStatus.kt new file mode 100644 index 00000000..af039bcc --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/MachineStatus.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute + +/** + * The status of a machine. + */ +enum class MachineStatus { + HALT, + RUNNING +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingElement.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingElement.kt new file mode 100644 index 00000000..23a5b444 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingElement.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute + +/** + * A logical core in a CPU. + * + * @property id The identifier of the core within the CPU. + * @property unit The [ProcessingUnit] the core is part of. + */ +data class ProcessingElement(val id: Int, val unit: ProcessingUnit) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingUnit.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingUnit.kt new file mode 100644 index 00000000..76985f64 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingUnit.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute + +/** + * A processing unit of a compute resource, either virtual or physical. + * + * @property vendor The vendor string of the cpu. + * @property family The cpu family number. + * @property model The model number of the cpu. + * @property modelName The name of the cpu model. + * @property clockRate The clock speed of the cpu in MHz. + * @property cores The number of logical cores in the cpu. + */ +data class ProcessingUnit( + val vendor: String, + val family: Int, + val model: Int, + val modelName: String, + val clockRate: Double, + val cores: Int +) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/host/Host.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/host/Host.kt new file mode 100644 index 00000000..21217468 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/host/Host.kt @@ -0,0 +1,83 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.host + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineMessage +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import com.atlarge.opendc.core.resources.compute.scheduling.MachineScheduler +import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * A physical compute node in a datacenter that is able to run [Application]s. + * + * @property uid The unique identifier of this machine. + * @property name The name of the machine. + * @property scheduler The process scheduler of this machine. + * @property cores The list of processing elements in the machine. + * @property details The details of this host. + */ +data class Host( + override val uid: UUID, + override val name: String, + val scheduler: MachineScheduler, + val cores: List<ProcessingElement>, + override val details: Map<String, Any> = emptyMap() +) : Machine { + /** + * Build the [Behavior] for a physical machine. + */ + override suspend fun invoke(ctx: ProcessContext, supervisor: SendRef<MachineSupervisionEvent>, main: Channel<MachineMessage>) { + coroutineScope { + supervisor.sendOnce(MachineSupervisionEvent.Announce(this@Host, main.send)) + supervisor.sendOnce(MachineSupervisionEvent.Up(main.send)) + + val sched = scheduler(ctx, this, this@Host, main.send) + sched.updateResources(cores) + + val inlet = ctx.listen(main.receive) + while (isActive) { + when (val msg = inlet.receive()) { + is MachineMessage.Submit -> { + sched.submit(msg.application, msg.key, msg.broker) + } + } + } + inlet.close() + } + } + + override fun equals(other: Any?): Boolean = other is Machine && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineScheduler.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineScheduler.kt new file mode 100644 index 00000000..400c6a0f --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineScheduler.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineRef +import kotlinx.coroutines.CoroutineScope + +/** + * A factory interface for constructing a [MachineSchedulerLogic]. + */ +interface MachineScheduler { + /** + * Construct a [MachineSchedulerLogic] in the given [ProcessContext]. + * + * @param machine The machine to create the scheduler for. + */ + operator fun invoke(ctx: ProcessContext, coroutineScope: CoroutineScope, machine: Machine, machineRef: MachineRef): MachineSchedulerLogic +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineSchedulerLogic.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineSchedulerLogic.kt new file mode 100644 index 00000000..9bc20eb8 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineSchedulerLogic.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineEvent +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.ProcessSupervisor +import kotlinx.coroutines.CoroutineScope + +/** + * A scheduler that distributes processes over processing elements in a machine. + * + * @property ctx The context in which the scheduler runs. + * @property machine The machine to create the scheduler for. + */ +abstract class MachineSchedulerLogic( + protected val ctx: ProcessContext, + protected val coroutineScope: CoroutineScope, + protected val machine: Machine, + protected val machineRef: MachineRef +) : ProcessSupervisor { + /** + * Update the available resources in the machine. + * + * @param cores The available processing cores for the scheduler. + */ + abstract suspend fun updateResources(cores: List<ProcessingElement>) + + /** + * Submit the specified application for scheduling. + * + * @param application The application to submit. + * @param key The key to identify the application instance. + * @param handler The broker of this application. + */ + abstract suspend fun submit(application: Application, key: Any, handler: SendRef<MachineEvent>) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessObserver.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessObserver.kt new file mode 100644 index 00000000..2cfeec06 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessObserver.kt @@ -0,0 +1,77 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.opendc.core.resources.compute.MachineEvent +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.Pid +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * An interface for observing processes. + */ +interface ProcessObserver { + /** + * This method is invoked when the setup of an application completed successfully. + * + * @param pid The process id of the process that has been initialized. + */ + fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) + + /** + * This method is invoked when a process exits. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + fun onTermination(instance: MachineRef, pid: Pid, status: Int) + + companion object { + /** + * Create the [Behavior] for a [ProcessObserver]. + * + * @param observer The observer to create the behavior for. + */ + suspend operator fun invoke(ctx: ProcessContext, observer: ProcessObserver, main: ReceiveRef<Any>) { + val inlet = ctx.listen(main) + + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is MachineEvent.Submitted -> observer.onSubmission(msg.instance, msg.application, msg.key, msg.pid) + is MachineEvent.Terminated -> observer.onTermination(msg.instance, msg.pid, msg.status) + } + } + } + + inlet.close() + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessState.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessState.kt new file mode 100644 index 00000000..e9e9a53e --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessState.kt @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +/** + * An enumeration of the distinct states of an application instance (process). + */ +enum class ProcessState { + /** + * Default state of a process, where the task is waiting to be assigned and installed on a machine. + */ + CREATED, + + /** + * State to indicate that the process is waiting to be ran. + */ + READY, + + /** + * State to indicate that the process is currently running. + */ + RUNNING, + + /** + * State to indicate that the process has been terminated, either successfully or due to failure. + */ + TERMINATED, +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessView.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessView.kt new file mode 100644 index 00000000..daf71af4 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessView.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.resources.compute.MachineEvent +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.Pid +import com.atlarge.opendc.core.workload.application.ProcessMessage + +/** + * A process represents a application instance running on a particular machine from the machine scheduler's point of + * view. + * + * @property application The application this is an instance of. + * @property broker The broker of the process, which is informed about its progress. + * @property pid The reference to the application instance. + * @property state The state of the process. + */ +data class ProcessView( + val application: Application, + val broker: SendRef<MachineEvent>, + val pid: Pid, + var state: ProcessState = ProcessState.CREATED +) { + /** + * The slice of processing elements allocated for the process. Available as soon as the state + * becomes [ProcessState.RUNNING] + */ + lateinit var allocation: ProcessMessage.Allocation +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/SpaceSharedMachineScheduler.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/SpaceSharedMachineScheduler.kt new file mode 100644 index 00000000..6cdc3ea5 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/SpaceSharedMachineScheduler.kt @@ -0,0 +1,189 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.scheduling + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineEvent +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import com.atlarge.opendc.core.workload.application.Application +import com.atlarge.opendc.core.workload.application.Pid +import com.atlarge.opendc.core.workload.application.ProcessEvent +import com.atlarge.opendc.core.workload.application.ProcessMessage +import com.atlarge.opendc.core.workload.application.ProcessSupervisor +import java.util.ArrayDeque +import java.util.UUID +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +/** + * A machine scheduling policy where processes are space-shared on the machine. + * + * Space-sharing for machine scheduling means that all running processes will be allocated a separate set of the + * [ProcessingElement]s in a [Machine]. Applications are scheduled on the machine in first-in-first-out (FIFO) order, + * thus larger applications may block smaller tasks from proceeding, while space is available (no backfilling). + * + * @property ctx The context in which the scheduler runs. + * @property machine The machine to create the scheduler for. + */ +class SpaceSharedMachineScheduler( + ctx: ProcessContext, + coroutineScope: CoroutineScope, + machine: Machine, + machineRef: MachineRef +) : MachineSchedulerLogic(ctx, coroutineScope, machine, machineRef), ProcessSupervisor { + private var cores = 0 + private val available = ArrayDeque<ProcessingElement>() + private val queue = ArrayDeque<SendRef<ProcessMessage>>() + private val running = LinkedHashSet<SendRef<ProcessMessage>>() + private val processes = HashMap<SendRef<ProcessMessage>, ProcessView>() + private val jobs = HashMap<SendRef<ProcessMessage>, Job>() + private val channel = ctx.open<ProcessEvent>() + + init { + coroutineScope.launch { + ProcessSupervisor(ctx, this@SpaceSharedMachineScheduler, channel.receive) + } + } + + override suspend fun updateResources(cores: List<ProcessingElement>) { + available.addAll(cores) + this.cores = cores.size + + // Add all running tasks in front of the queue + running.reversed().forEach { queue.addFirst(it) } + running.clear() + + reschedule() + } + + override suspend fun submit(application: Application, key: Any, handler: SendRef<MachineEvent>) { + val channel = ctx.open<ProcessMessage>() + val pid = channel.send + // Create application instance on the machine + ctx.spawn({ application(it, pid, channel.receive) }, name = application.name + ":" + application.uid + ":" + UUID.randomUUID().toString()) + processes[pid] = ProcessView(application, handler, pid) + + // Inform the owner that the task has been submitted + handler.sendOnce(MachineEvent.Submitted(machineRef, application, key, pid)) + + // Setup the task + pid.sendOnce(ProcessMessage.Setup(machine, this@SpaceSharedMachineScheduler.channel.send)) + } + + /** + * Reschedule the tasks on this machine. + */ + private fun reschedule() { + while (queue.isNotEmpty()) { + val pid = queue.peek() + val process = processes[pid]!! + + if (process.application.cores >= cores) { + // The task will never fit on the machine + // TODO Fail task + println("Process $process will not fit in machine: dropping.") + queue.remove() + return + } else if (process.application.cores > available.size) { + // The task will not fit at the moment + // Try again if resources become available + // ctx.log.debug("Application queued: not enough processing elements available [requested={}, available={}]", + // process.application.cores, available.size) + return + } + queue.remove() + + // Compute the available resources + val resources = List(process.application.cores) { + val pe = available.poll() + Pair(pe, 1.0) + }.toMap() + process.state = ProcessState.RUNNING + process.allocation = ProcessMessage.Allocation(resources, Long.MAX_VALUE) + running += pid + + coroutineScope.launch(Dispatchers.Unconfined) { + pid.sendOnce(process.allocation) + } + } + } + + override fun onReady(pid: Pid) { + val process = processes[pid]!! + + // Schedule the task if it has been setup + queue.add(pid) + process.state = ProcessState.READY + + reschedule() + } + + override fun onConsume(pid: Pid, utilization: Map<ProcessingElement, Double>, until: Long) { + val process = processes[pid]!! + val allocation = process.allocation + + if (until > allocation.until) { + // Tasks are not allowed to extend allocation provided by the machine + // TODO Fail the task + println("Task $pid must not extend allocation provided by the machine") + } else if (until < allocation.until) { + // Shrink allocation + process.allocation = allocation.copy(until = until) + } + + // Reschedule the process after the allocation expires + jobs[pid] = coroutineScope.launch { + delay(process.allocation.until - ctx.clock.millis()) + // We just extend the allocation + process.allocation = process.allocation.copy(until = Long.MAX_VALUE) + pid.sendOnce(process.allocation) + } + } + + override fun onExit(pid: Pid, status: Int) { + val process = processes.remove(pid)!! + running -= pid + jobs[pid]?.cancel() + process.allocation.resources.keys.forEach { available.add(it) } + + // Inform the owner that the task has terminated + coroutineScope.launch { + process.broker.sendOnce(MachineEvent.Terminated(machineRef, pid, status)) + } + } + + companion object : MachineScheduler { + override fun invoke(ctx: ProcessContext, coroutineScope: CoroutineScope, machine: Machine, machineRef: MachineRef): MachineSchedulerLogic { + return SpaceSharedMachineScheduler(ctx, coroutineScope, machine, machineRef) + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisionEvent.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisionEvent.kt new file mode 100644 index 00000000..8a022112 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisionEvent.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.supervision + +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineRef + +/** + * A supervision protocol for [Machine] instances. + */ +sealed class MachineSupervisionEvent { + /** + * Initialization message to introduce to the supervisor a new machine by specifying its static information and + * address. + * + * @property machine The machine that is being announced. + * @property ref The address to talk to the host. + */ + data class Announce(val machine: Machine, val ref: MachineRef) : MachineSupervisionEvent() + + /** + * Indicate that the specified machine has booted up. + * + * @property ref The address to talk to the machine. + */ + data class Up(val ref: MachineRef) : MachineSupervisionEvent() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisor.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisor.kt new file mode 100644 index 00000000..37cf9d44 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisor.kt @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.resources.compute.supervision + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.MachineRef +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * An interface for supervising [Machine] instances. + */ +interface MachineSupervisor { + /** + * This method is invoked when a new machine is introduced to the supervisor by specifying its static information + * and address. + * + * @param machine The machine that is being announced. + * @param ref The address to talk to the host. + */ + fun onAnnounce(machine: Machine, ref: MachineRef) + + /** + * This method is invoked when a process exits. + * + * @param ref The address to talk to the machine. + */ + fun onUp(ref: MachineRef) + + companion object { + /** + * Create the [Behavior] for a [MachineSupervisor]. + * + * @param supervisor The supervisor to create the behavior for. + */ + suspend operator fun invoke(ctx: ProcessContext, supervisor: MachineSupervisor, main: ReceiveRef<MachineSupervisionEvent>) { + val inlet = ctx.listen(main) + + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is MachineSupervisionEvent.Announce -> supervisor.onAnnounce(msg.machine, msg.ref) + is MachineSupervisionEvent.Up -> supervisor.onUp(msg.ref) + } + } + } + + inlet.close() + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/Service.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/Service.kt new file mode 100644 index 00000000..8dcec760 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/Service.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services + +import com.atlarge.opendc.core.Identity +import java.util.UUID + +/** + * An interface for identifying service implementations of the same type (providing the same service). + * + * @param T The shape of the messages the service responds to. + */ +interface Service<T : Any> : Identity + +/** + * Helper class for constructing a [Service]. + * + * @property uid The unique identifier of the service. + * @property name The name of the service. + */ +abstract class AbstractService<T : Any>(override val uid: UUID, override val name: String) : Service<T> { + override fun equals(other: Any?): Boolean = other is Service<*> && uid == other.uid + override fun hashCode(): Int = uid.hashCode() + override fun toString(): String = "Service[uid=$uid,name=$name]" +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceMap.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceMap.kt new file mode 100644 index 00000000..14cf4845 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceMap.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services + +import com.atlarge.odcsim.SendRef + +/** + * A map containing services. + */ +interface ServiceMap { + /** + * Determine if this map contains the service with the specified [Service]. + * + * @param key The key of the service to check for. + * @return `true` if the service is in the map, `false` otherwise. + */ + operator fun contains(key: Service<*>): Boolean + + /** + * Obtain the service with the specified [Service]. + * + * @param key The key of the service to obtain. + * @return The references to the service. + * @throws IllegalArgumentException if the key does not exists in the map. + */ + operator fun <T : Any> get(key: Service<T>): SendRef<T> +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceProvider.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceProvider.kt new file mode 100644 index 00000000..3592d578 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceProvider.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.Identity +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.ZoneMessage +import java.util.UUID + +/** + * An abstract representation of a cloud service implementation provided by a cloud platform. + */ +interface ServiceProvider : Identity { + /** + * The unique identifier of the service implementation. + */ + override val uid: UUID + + /** + * The name of the service implementation. + */ + override val name: String + + /** + * The set of services provided by this [ServiceProvider]. + */ + val provides: Set<Service<*>> + + /** + * The dependencies of the service implementation. + */ + val dependencies: Set<Service<*>> + + /** + * Build the runtime [Behavior] for this service. + * + * @param zone The zone model for which the service should be build. + * @param zoneRef The runtime reference to the zone's actor for communication. + * @param main The channel on which the service should listen. + */ + suspend operator fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef<ZoneMessage>, main: Channel<Any>) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/ProvisioningService.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/ProvisioningService.kt new file mode 100644 index 00000000..604e1942 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/ProvisioningService.kt @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services.provisioning + +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.services.AbstractService +import com.atlarge.opendc.core.services.Service +import com.atlarge.opendc.core.services.ServiceProvider +import com.atlarge.opendc.core.services.resources.HostView +import java.util.UUID + +/** + * A cloud platform service that provisions the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +abstract class ProvisioningService : ServiceProvider { + override val provides: Set<Service<*>> = setOf(ProvisioningService) + + /** + * The service key of the provisioner service. + */ + companion object : AbstractService<ProvisioningMessage>(UUID.randomUUID(), "provisioner") +} + +/** + * A message protocol for communicating to the resource provisioner. + */ +sealed class ProvisioningMessage { + /** + * Request the specified number of resources from the provisioner. + * + * @property numHosts The number of hosts to request from the provisioner. + * @property replyTo The actor to reply to. + */ + data class Request(val numHosts: Int, val replyTo: SendRef<ProvisioningResponse.Lease>) : ProvisioningMessage() + + /** + * Release the specified resource [ProvisioningResponse.Lease]. + * + * @property lease The lease to release. + */ + data class Release(val lease: ProvisioningResponse.Lease) : ProvisioningMessage() +} + +/** + * A message protocol used by the resource provisioner to respond to [ProvisioningMessage]s. + */ +sealed class ProvisioningResponse { + /** + * A lease for the specified hosts. + */ + data class Lease(val hosts: List<HostView>) : ProvisioningResponse() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/SimpleProvisioningService.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/SimpleProvisioningService.kt new file mode 100644 index 00000000..5f77e1a1 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/SimpleProvisioningService.kt @@ -0,0 +1,107 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services.provisioning + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.ask +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.ZoneMessage +import com.atlarge.opendc.core.find +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.services.Service +import com.atlarge.opendc.core.services.resources.HostView +import com.atlarge.opendc.core.services.resources.ResourceManagementMessage +import com.atlarge.opendc.core.services.resources.ResourceManagementResponse +import com.atlarge.opendc.core.services.resources.ResourceManagementService +import java.util.ArrayDeque +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive + +/** + * A cloud platform service that provisions the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +object SimpleProvisioningService : ProvisioningService() { + override val uid: UUID = UUID.randomUUID() + override val name: String = "simple-provisioner" + override val dependencies: Set<Service<*>> = setOf(ResourceManagementService) + + /** + * Build the runtime [Behavior] for the resource provisioner, responding to messages of shape [ProvisioningMessage]. + */ + override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef<ZoneMessage>, main: Channel<Any>) { + val inlet = ctx.listen(main.receive) + val manager = zoneRef.find(ResourceManagementService) + + delay(10) + + val hosts = mutableMapOf<MachineRef, HostView>() + val available = ArrayDeque<HostView>() + val leases = mutableSetOf<ProvisioningResponse.Lease>() + + // Subscribe to all machines in the zone + for (cluster in zone.clusters) { + for (host in cluster.hosts) { + val msg: ResourceManagementResponse.Listing = manager.ask { ResourceManagementMessage.Lookup(host, it) } + if (msg.instance != null) { + hosts[msg.instance.ref] = msg.instance + available.add(msg.instance) + } + } + } + + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is ProvisioningMessage.Request -> { + println("Provisioning ${msg.numHosts} hosts") + val leaseHosts = mutableListOf<HostView>() + while (available.isNotEmpty() && leaseHosts.size < msg.numHosts) { + leaseHosts += available.poll() + } + val lease = ProvisioningResponse.Lease(leaseHosts) + leases += lease + msg.replyTo.sendOnce(lease) + } + is ProvisioningMessage.Release -> { + val lease = msg.lease + if (lease in leases) { + return@coroutineScope + } + available.addAll(lease.hosts) + leases -= lease + } + } + } + } + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/HostView.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/HostView.kt new file mode 100644 index 00000000..60dd2eb9 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/HostView.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services.resources + +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.resources.compute.MachineStatus +import com.atlarge.opendc.core.resources.compute.host.Host + +/** + * The dynamic information of a [Host] instance that is being tracked by the [ResourceManagementService]. This means + * that information may not be up-to-date. + * + * @property host The static information of the host. + * @property ref The reference to the host's actor. + * @property status The status of the machine. + */ +data class HostView(val host: Host, val ref: MachineRef, val status: MachineStatus = MachineStatus.HALT) { + override fun equals(other: Any?): Boolean = other is HostView && host.uid == other.host.uid + override fun hashCode(): Int = host.uid.hashCode() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/ResourceManagementService.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/ResourceManagementService.kt new file mode 100644 index 00000000..cc032952 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/ResourceManagementService.kt @@ -0,0 +1,120 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.services.resources + +import com.atlarge.odcsim.Channel +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.ZoneMessage +import com.atlarge.opendc.core.resources.compute.MachineRef +import com.atlarge.opendc.core.resources.compute.MachineStatus +import com.atlarge.opendc.core.resources.compute.host.Host +import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent +import com.atlarge.opendc.core.services.Service +import com.atlarge.opendc.core.services.ServiceProvider +import java.util.UUID +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * A cloud platform service that manages the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +object ResourceManagementService : ServiceProvider, Service<ResourceManagementMessage> { + override val uid: UUID = UUID.randomUUID() + override val name: String = "resource-manager" + override val provides: Set<Service<*>> = setOf(ResourceManagementService) + override val dependencies: Set<Service<*>> = emptySet() + + /** + * Build the runtime behavior of the [ResourceManagementService]. + */ + override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef<ZoneMessage>, main: Channel<Any>) { + // Launch the clusters of the zone + for (cluster in zone.clusters) { + ctx.spawn({ cluster(it, main.send) }, name = "${cluster.name}-${cluster.uid}") + } + + val hosts = mutableMapOf<MachineRef, HostView>() + val inlet = ctx.listen(main.receive) + + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is MachineSupervisionEvent.Announce -> { + val host = msg.machine as? Host + if (host != null) { + hosts[msg.ref] = HostView(host, msg.ref) + } + } + is MachineSupervisionEvent.Up -> { + hosts.computeIfPresent(msg.ref) { _, value -> + value.copy(status = MachineStatus.RUNNING) + } + } + is ResourceManagementMessage.Lookup -> { + msg.replyTo.sendOnce(ResourceManagementResponse.Listing(hosts.values.find { it.host == msg.host })) + } + } + } + } + } +} + +/** + * A reference to the resource manager of a zone. + */ +typealias ResourceManagerRef = SendRef<ResourceManagementMessage> + +/** + * A message protocol for communicating to the resource manager. + */ +sealed class ResourceManagementMessage { + /** + * Lookup the specified [Host]. + * + * @property host The host to lookup. + * @property replyTo The address to sent the response to. + */ + data class Lookup( + val host: Host, + val replyTo: SendRef<ResourceManagementResponse.Listing> + ) : ResourceManagementMessage() +} + +/** + * A message protocol used by the resource manager to respond to [ResourceManagementMessage]s. + */ +sealed class ResourceManagementResponse { + /** + * A response to a [ResourceManagementMessage.Lookup] request. + * + * @property instance The instance that was found or `null` if it does not exist. + */ + data class Listing(val instance: HostView?) : ResourceManagementResponse() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/Workload.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/Workload.kt new file mode 100644 index 00000000..def5d6e4 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/Workload.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.workload + +import com.atlarge.opendc.core.Identity +import com.atlarge.opendc.core.User + +/** + * A high-level abstraction that represents the actual work that a set of compute resources perform, such + * as running an application on a machine or a whole workflow running multiple tasks on numerous machines. + */ +interface Workload : Identity { + /** + * The owner of this workload. + */ + val owner: User +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Application.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Application.kt new file mode 100644 index 00000000..d1ccd347 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Application.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.workload.application + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.opendc.core.workload.Workload + +/** + * A generic representation of a workload that can directly be executed by physical or virtual compute resources, + * such as a web server application. + */ +interface Application : Workload { + /** + * The number of processing elements required by the task. + */ + val cores: Int + + /** + * Build the runtime [Behavior] of an application, accepting messages of [ProcessMessage]. + * + * This is a model for the runtime behavior of an application instance (process) that describes how an application + * instance consumes the allocated resources on a machine. + */ + suspend operator fun invoke(ctx: ProcessContext, pid: Pid, main: ReceiveRef<ProcessMessage>) +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/FlopsApplication.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/FlopsApplication.kt new file mode 100644 index 00000000..a2dbacf1 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/FlopsApplication.kt @@ -0,0 +1,114 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.workload.application + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.odcsim.sendOnce +import com.atlarge.opendc.core.User +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import java.util.UUID +import kotlin.math.ceil +import kotlin.math.min +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * An [Application] implementation that models applications performing a static number of floating point operations + * ([flops]) on a compute resource. + * + * @property uid A unique identifier for this application. + * @property name The name of the application. + * @property owner The owner of this application. + * @property cores The number of cores needed for this application. + * @property flops The number of floating point operations to perform for this task. + */ +class FlopsApplication( + override val uid: UUID, + override val name: String, + override val owner: User, + override val cores: Int, + val flops: Long +) : Application { + + init { + require(flops >= 0) { "Negative number of flops" } + } + + /** + * Build the runtime [Behavior] based on a number of floating point operations to execute. + */ + override suspend fun invoke(ctx: ProcessContext, pid: Pid, main: ReceiveRef<ProcessMessage>) { + val inlet = ctx.listen(main) + var remaining = flops + var start: Long = 0 + lateinit var allocation: Map<ProcessingElement, Double> + + val created = inlet.receive() as ProcessMessage.Setup + val ref = created.ref + + ref.sendOnce(ProcessEvent.Ready(pid)) + + suspend fun processAllocation(resources: Map<ProcessingElement, Double>, until: Long) { + start = ctx.clock.millis() + allocation = resources + .asSequence() + .take(cores) + .associateBy({ it.key }, { it.value }) + + val speed = allocation.asSequence() + .map { (key, value) -> key.unit.clockRate * value } + .average() + val finishedAt = ceil(ctx.clock.millis() + remaining / speed).toLong() + ref.sendOnce(ProcessEvent.Consume(pid, allocation, min(finishedAt, until))) + } + + var msg = inlet.receive() as ProcessMessage.Allocation + processAllocation(msg.resources, msg.until) + + coroutineScope { + while (isActive) { + msg = inlet.receive() as ProcessMessage.Allocation + + /* Compute the consumption of flops */ + val consumed = allocation.asSequence() + .map { (key, value) -> key.unit.clockRate * value * (ctx.clock.millis() - start) } + .sum() + // Ceil to prevent consumed flops being rounded to 0 + remaining -= ceil(consumed).toLong() + + /* Test whether all flops have been consumed and the task is finished */ + if (remaining <= 0) { + ref.sendOnce(ProcessEvent.Exit(pid, 0)) + break + } + processAllocation(msg.resources, msg.until) + } + } + + inlet.close() + } +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Process.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Process.kt new file mode 100644 index 00000000..fc70b924 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Process.kt @@ -0,0 +1,90 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.workload.application + +import com.atlarge.odcsim.SendRef +import com.atlarge.opendc.core.resources.compute.Machine +import com.atlarge.opendc.core.resources.compute.ProcessingElement + +/** + * The process id (pid) is a reference to the application instance (process) that accepts messages of + * type [ProcessMessage]. + */ +typealias Pid = SendRef<ProcessMessage> + +/** + * A message protocol for actors to communicate with task instances (called processes). + */ +sealed class ProcessMessage { + /** + * Indicate that the task should be installed to the specified machine. + * + * @property machine The machine to install the task. + * @property ref The reference to the machine instance. + */ + data class Setup(val machine: Machine, val ref: SendRef<ProcessEvent>) : ProcessMessage() + + /** + * Indicate an allocation of compute resources on a machine for a certain duration. + * The task may assume that the reservation occurs after installation on the same machine. + * + * @property resources The cpu cores (and the utilization percentages) allocated for the task. + * @property until The point in time till which the reservation is valid. + */ + data class Allocation(val resources: Map<ProcessingElement, Double>, val until: Long) : ProcessMessage() +} + +/** + * The message protocol used by application instances respond to [ProcessMessage]s. + */ +sealed class ProcessEvent { + /** + * Indicate that the process is ready to start processing. + * + * @property pid A reference to the application instance. + */ + data class Ready(val pid: Pid) : ProcessEvent() + + /** + * Indicate the estimated resource utilization of the task until a specified point in time. + * + * @property pid A reference to the application instance of the represented utilization. + * @property utilization The utilization of the cpu cores as a percentage. + * @property until The point in time until which the utilization is valid. + */ + data class Consume( + val pid: Pid, + val utilization: Map<ProcessingElement, Double>, + val until: Long + ) : ProcessEvent() + + /** + * Indicate that a process has been terminated. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + data class Exit(val pid: Pid, val status: Int) : ProcessEvent() +} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/ProcessSupervisor.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/ProcessSupervisor.kt new file mode 100644 index 00000000..fefd6c88 --- /dev/null +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/ProcessSupervisor.kt @@ -0,0 +1,83 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.core.workload.application + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.ReceiveRef +import com.atlarge.opendc.core.resources.compute.ProcessingElement +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.isActive + +/** + * An interface for supervising processes. + */ +interface ProcessSupervisor { + /** + * This method is invoked when the setup of an application completed successfully. + * + * @param pid The process id of the process that has been initialized. + */ + fun onReady(pid: Pid) {} + + /** + * This method is invoked when a process informs the machine that it is running with the + * estimated resource utilization until a specified point in time. + * + * @param pid The process id of the process that is running. + * @param utilization The utilization of the cpu cores as a percentage. + * @param until The point in time until which the utilization is valid. + */ + fun onConsume(pid: Pid, utilization: Map<ProcessingElement, Double>, until: Long) {} + + /** + * This method is invoked when a process exits. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + fun onExit(pid: Pid, status: Int) {} + + companion object { + /** + * Create the [Behavior] for a [ProcessSupervisor]. + * + * @param supervisor The supervisor to create the behavior for. + */ + suspend operator fun invoke(ctx: ProcessContext, supervisor: ProcessSupervisor, main: ReceiveRef<ProcessEvent>) { + val inlet = ctx.listen(main) + coroutineScope { + while (isActive) { + when (val msg = inlet.receive()) { + is ProcessEvent.Ready -> supervisor.onReady(msg.pid) + is ProcessEvent.Consume -> supervisor.onConsume(msg.pid, msg.utilization, msg.until) + is ProcessEvent.Exit -> supervisor.onExit(msg.pid, msg.status) + } + } + } + inlet.close() + } + } +} |
