diff options
75 files changed, 4953 insertions, 0 deletions
diff --git a/opendc-core/build.gradle.kts b/opendc-core/build.gradle.kts new file mode 100644 index 00000000..c89e27e6 --- /dev/null +++ b/opendc-core/build.gradle.kts @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` +} + +/* Project configuration */ +repositories { + jcenter() +} + +val junitJupiterVersion: String by extra +val junitPlatformVersion: String by extra + +dependencies { + api(project(":odcsim-core")) + + implementation(kotlin("stdlib")) + + testImplementation(project(":odcsim-testkit")) + testImplementation(project(":odcsim-engine-omega")) + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion") + testImplementation("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") + testRuntimeOnly("org.slf4j:slf4j-simple:1.7.25") + testImplementation("com.nhaarman.mockitokotlin2:mockito-kotlin:2.0.0") +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Broker.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Broker.kt new file mode 100644 index 00000000..0e2adb40 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Broker.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import com.atlarge.odcsim.Behavior + +/** + * A broker acting on the various cloud platforms on behalf of the user. + */ +interface Broker { + /** + * Build the runtime behavior of the [Broker]. + * + * @param platforms A list of available cloud platforms. + * @return The runtime behavior of the broker. + */ + operator fun invoke(platforms: List<PlatformRef>): Behavior<*> +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Cluster.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Cluster.kt new file mode 100644 index 00000000..1ed11e87 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Cluster.kt @@ -0,0 +1,56 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.empty +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.unsafeCast +import com.atlarge.opendc.model.resources.compute.host.Host +import com.atlarge.opendc.model.services.resources.ResourceManagerRef +import java.util.UUID + +/** + * A logical grouping of heterogeneous hosts and primary storage within a zone. + * + * @property uid The unique identifier of the cluster. + * @property name The name of this cluster. + * @property hosts The machines in this cluster. + */ +data class Cluster(override val uid: UUID, override val name: String, val hosts: List<Host>) : Identity { + /** + * Build the runtime [Behavior] of this cluster of hosts. + * + * @param manager The manager of the cluster. + */ + operator fun invoke(manager: ResourceManagerRef): Behavior<Nothing> = setup { ctx -> + // Launch all hosts in the cluster + for (host in hosts) { + ctx.spawn(host(manager.unsafeCast()), name = host.name) + } + + empty() + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Environment.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Environment.kt new file mode 100644 index 00000000..b7fe33db --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Environment.kt @@ -0,0 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +/** + * A description of a large-scale computing environment. This description includes including key size and topology + * information of the environment, types of resources, but also various operational and management rules such as + * scheduled maintenance, allocation and other constraints. + * + * @property name The name of the environment. + * @property description A small textual description about the environment that is being modeled + * @property platforms The cloud platforms (such as AWS or GCE) in this environment. + */ +data class Environment(val name: String, val description: String?, val platforms: List<Platform>) diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Identity.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Identity.kt new file mode 100644 index 00000000..2455d138 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Identity.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import java.util.UUID + +/** + * An object that has a unique identity. + */ +interface Identity { + /** + * A unique, opaque, system-generated value, representing the object. + */ + val uid: UUID + + /** + * A non-empty, human-readable string representing the object. + */ + val name: String +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Model.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Model.kt new file mode 100644 index 00000000..098bfbde --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Model.kt @@ -0,0 +1,71 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Terminated +import com.atlarge.odcsim.receiveSignal +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.stopped +import com.atlarge.odcsim.unhandled + +/** + * A simulation model for large-scale simulation of datacenter infrastructure, built with the *odcsim* API. + * + * @property environment The environment in which brokers operate. + * @property brokers The brokers acting on the cloud platforms. + */ +data class Model(val environment: Environment, val brokers: List<Broker>) { + /** + * Build the runtime behavior of the universe. + */ + operator fun invoke(): Behavior<Nothing> = setup { ctx -> + // Setup the environment + val platforms = environment.platforms.map { platform -> + ctx.spawn(platform(), name = platform.name) + } + + // Launch all brokers + val remaining = mutableSetOf<ActorRef<*>>() + for (broker in brokers) { + val ref = ctx.spawnAnonymous(broker(platforms)) + ctx.watch(ref) + remaining += ref + } + + receiveSignal { _, signal -> + when (signal) { + is Terminated -> { + remaining -= signal.ref + if (remaining.isEmpty()) stopped() else same() + } + else -> + unhandled() + } + } + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Platform.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Platform.kt new file mode 100644 index 00000000..129ee3a9 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Platform.kt @@ -0,0 +1,103 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.coroutines.actorContext +import com.atlarge.odcsim.coroutines.dsl.ask +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import java.util.UUID + +/** + * A representation of a cloud platform such as Amazon Web Services (AWS), Microsoft Azure or Google Cloud. + * + * @property uid The unique identifier of this datacenter. + * @property name the name of the platform. + * @property zones The availability zones available on this platform. + */ +data class Platform(override val uid: UUID, override val name: String, val zones: List<Zone>) : Identity { + /** + * Build the runtime [Behavior] of this cloud platform. + */ + operator fun invoke(): Behavior<PlatformMessage> = setup { ctx -> + ctx.log.info("Starting cloud platform {} [{}] with {} zones", name, uid, zones.size) + + // Launch all zones of the cloud platform + val zoneInstances = zones.associateWith { zone -> + ctx.spawn(zone(), name = zone.name) + } + + receiveMessage { msg -> + when (msg) { + is PlatformMessage.ListZones -> { + ctx.send(msg.replyTo, PlatformResponse.Zones(ctx.self, zoneInstances.mapKeys { it.key.name })) + same() + } + } + } + } +} + +/** + * A reference to the actor managing the zone. + */ +typealias PlatformRef = ActorRef<PlatformMessage> + +/** + * A message protocol for communicating with a cloud platform. + */ +sealed class PlatformMessage { + /** + * Request the available zones on this platform. + * + * @property replyTo The actor address to send the response to. + */ + data class ListZones(val replyTo: ActorRef<PlatformResponse.Zones>) : PlatformMessage() +} + +/** + * A message protocol used by platform actors to respond to [PlatformMessage]s. + */ +sealed class PlatformResponse { + /** + * The zones available on this cloud platform. + * + * @property platform The reference to the cloud platform these are the zones of. + * @property zones The zones in this cloud platform. + */ + data class Zones(val platform: PlatformRef, val zones: Map<String, ZoneRef>) : PlatformResponse() +} + +/** + * Retrieve the available zones of a platform. + */ +suspend fun PlatformRef.zones(): Map<String, ZoneRef> { + val ctx = actorContext<Any>() + val zones: PlatformResponse.Zones = ctx.ask(this) { PlatformMessage.ListZones(it) } + return zones.zones +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/User.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/User.kt new file mode 100644 index 00000000..502f7a74 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/User.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +/** + * A user of the cloud network. + */ +interface User : Identity { + /** + * The name of the user. + */ + override val name: String +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Zone.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Zone.kt new file mode 100644 index 00000000..b1d00dac --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/Zone.kt @@ -0,0 +1,212 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.Terminated +import com.atlarge.odcsim.coroutines.actorContext +import com.atlarge.odcsim.coroutines.dsl.ask +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.unhandled +import com.atlarge.opendc.model.services.Service +import com.atlarge.opendc.model.services.ServiceProvider +import java.util.ArrayDeque +import java.util.UUID + +/** + * An isolated location within a datacenter region from which public cloud services operate, roughly equivalent to a + * single datacenter. Zones contain one or more clusters and secondary storage. + * + * This class models *only* the static information of a zone, with dynamic information being contained within the zone's + * actor. During runtime, it's actor acts as a registry for all the cloud services provided by the zone. + * + * @property uid The unique identifier of this availability zone + * @property name The name of the zone within its platform. + * @property services The initial set of services provided by the zone. + * @property clusters The clusters of machines in this zone. + */ +data class Zone( + override val uid: UUID, + override val name: String, + val services: Set<ServiceProvider>, + val clusters: List<Cluster> +) : Identity { + /** + * Build the runtime [Behavior] of this datacenter. + */ + operator fun invoke(): Behavior<ZoneMessage> = setup { ctx -> + ctx.log.info("Starting zone {} [{}]", name, uid) + + // Launch all services of the zone + val instances: MutableMap<Service<*>, ActorRef<*>> = mutableMapOf() + validateDependencies(services) + + for (provider in services) { + val ref = ctx.spawn(provider(this, ctx.self), name = "${provider.name}-${provider.uid}") + ctx.watch(ref) + provider.provides.forEach { instances[it] = ref } + } + + object : ReceivingBehavior<ZoneMessage>() { + override fun receive(ctx: ActorContext<ZoneMessage>, msg: ZoneMessage): Behavior<ZoneMessage> { + return when (msg) { + is ZoneMessage.Find -> { + ctx.send(msg.replyTo, ZoneResponse.Listing(ctx.self, msg.key, instances[msg.key])) + same() + } + } + } + + override fun receiveSignal(ctx: ActorContext<ZoneMessage>, signal: Signal): Behavior<ZoneMessage> { + return when (signal) { + is Terminated -> { + instances.values.remove(signal.ref) + same() + } + else -> + unhandled() + } + } + } + } + + /** + * Validate the service for unsatisfiable dependencies. + */ + private fun validateDependencies(providers: Set<ServiceProvider>) { + val providersByKey = HashMap<Service<*>, ServiceProvider>() + for (provider in providers) { + if (provider.provides.isEmpty()) { + throw IllegalArgumentException(("Service provider $provider does not provide any service.")) + } + for (key in provider.provides) { + if (key in providersByKey) { + throw IllegalArgumentException("Multiple providers for service $key") + } + providersByKey[key] = provider + } + } + + val visited = HashSet<ServiceProvider>() + val queue = ArrayDeque(providers) + while (queue.isNotEmpty()) { + val service = queue.poll() + visited.add(service) + + for (dependencyKey in service.dependencies) { + val dependency = providersByKey[dependencyKey] + ?: throw IllegalArgumentException("Dependency $dependencyKey not satisfied for service $service") + if (dependency !in visited) { + queue.add(dependency) + } + } + } + } + + override fun equals(other: Any?): Boolean = other is Zone && uid == other.uid + override fun hashCode(): Int = uid.hashCode() +} + +/** + * A reference to the actor managing the zone. + */ +typealias ZoneRef = ActorRef<ZoneMessage> + +/** + * A message protocol for communicating with the service registry + */ +sealed class ZoneMessage { + /** + * Lookup the specified service in this availability zone. + * + * @property key The key of the service to lookup. + * @property replyTo The address to reply to. + */ + data class Find( + val key: Service<*>, + val replyTo: ActorRef<ZoneResponse.Listing> + ) : ZoneMessage() +} + +/** + * A message protocol used by service registry actors to respond to [ZoneMessage]s. + */ +sealed class ZoneResponse { + /** + * The response sent when looking up services in a zone. + * + * @property zone The zone from which the response originates. + * @property key The key of the service that was looked up. + * @property ref The reference to the service or `null` if it is not present in the zone. + */ + data class Listing( + val zone: ZoneRef, + val key: Service<*>, + private val ref: ActorRef<*>? + ) : ZoneResponse() { + /** + * A flag to indicate whether the service is present. + */ + val isPresent: Boolean + get() = ref != null + + /** + * Determine whether this listing is for the specified key. + * + * @param key The key to check for. + * @return `true` if the listing is for this key, `false` otherwise. + */ + fun isForKey(key: Service<*>): Boolean = key == this.key + + /** + * Extract the result from the service lookup. + * + * @param key The key of the lookup. + * @return The reference to the service or `null` if it is not present in the zone. + */ + operator fun <T : Any> invoke(key: Service<T>): ActorRef<T>? { + require(this.key == key) { "Invalid key" } + @Suppress("UNCHECKED_CAST") + return ref as? ActorRef<T> + } + } +} + +/** + * Find the reference to the specified [ServiceProvider]. + * + * @param key The key of the service to find. + * @throws IllegalArgumentException if the service is not found. + */ +suspend fun <T : Any> ZoneRef.find(key: Service<T>): ActorRef<T> { + val ctx = actorContext<Any>() + val listing: ZoneResponse.Listing = ctx.ask(this) { ZoneMessage.Find(key, it) } + return listing(key) ?: throw IllegalArgumentException("Unknown key $key") +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/Machine.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/Machine.kt new file mode 100644 index 00000000..08d94e14 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/Machine.kt @@ -0,0 +1,105 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.opendc.model.Identity +import com.atlarge.opendc.model.resources.compute.supervision.MachineSupervisionEvent +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.Pid + +/** + * A generic representation of a compute node (either physical or virtual) that is able to run [Application]s. + */ +interface Machine : Identity { + /** + * The details of the machine in key/value pairs. + */ + val details: Map<String, Any> + + /** + * Build the runtime [Behavior] of this compute resource, accepting messages of [MachineMessage]. + * + * @param supervisor The supervisor of the machine. + */ + operator fun invoke(supervisor: ActorRef<MachineSupervisionEvent>): Behavior<MachineMessage> +} + +/** + * A reference to a machine instance that accepts messages of type [MachineMessage]. + */ +typealias MachineRef = ActorRef<MachineMessage> + +/** + * A message protocol for communicating with machine instances. + */ +sealed class MachineMessage { + /** + * Launch the specified [Application] on the machine instance. + * + * @property application The application to submit. + * @property key The key to identify this submission. + * @property broker The broker of the process to spawn. + */ + data class Submit( + val application: Application, + val key: Any, + val broker: ActorRef<MachineEvent> + ) : MachineMessage() +} + +/** + * A message protocol used by machine instances to respond to [MachineMessage]s. + */ +sealed class MachineEvent { + /** + * Indicate that an [Application] was spawned on a machine instance. + * + * @property instance The machine instance to which the application was submitted. + * @property application The application that has been submitted. + * @property key The key used to identify the submission. + * @property pid The spawned application instance. + */ + data class Submitted( + val instance: MachineRef, + val application: Application, + val key: Any, + val pid: Pid + ) : MachineEvent() + + /** + * Indicate that an [Application] has terminated on the specified machine. + * + * @property instance The machine instance to which the application was submitted. + * @property pid The reference to the application instance that has terminated. + * @property status The exit code of the task, where zero means successful. + */ + data class Terminated( + val instance: MachineRef, + val pid: Pid, + val status: Int = 0 + ) : MachineEvent() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/MachineStatus.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/MachineStatus.kt new file mode 100644 index 00000000..f4fe9494 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/MachineStatus.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute + +/** + * The status of a machine. + */ +enum class MachineStatus { + HALT, + RUNNING +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingElement.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingElement.kt new file mode 100644 index 00000000..9e5b1b71 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingElement.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute + +/** + * A logical core in a CPU. + * + * @property id The identifier of the core within the CPU. + * @property unit The [ProcessingUnit] the core is part of. + */ +data class ProcessingElement(val id: Int, val unit: ProcessingUnit) diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingUnit.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingUnit.kt new file mode 100644 index 00000000..025087a4 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/ProcessingUnit.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute + +/** + * A processing unit of a compute resource, either virtual or physical. + * + * @property vendor The vendor string of the cpu. + * @property family The cpu family number. + * @property model The model number of the cpu. + * @property modelName The name of the cpu model. + * @property clockRate The clock speed of the cpu in MHz. + * @property cores The number of logical cores in the cpu. + */ +data class ProcessingUnit( + val vendor: String, + val family: Int, + val model: Int, + val modelName: String, + val clockRate: Double, + val cores: Int +) diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/host/Host.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/host/Host.kt new file mode 100644 index 00000000..ca7ea204 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/host/Host.kt @@ -0,0 +1,86 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.host + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.join +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.unhandled +import com.atlarge.odcsim.withTimers +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineMessage +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import com.atlarge.opendc.model.resources.compute.scheduling.MachineScheduler +import com.atlarge.opendc.model.resources.compute.supervision.MachineSupervisionEvent +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.ProcessSupervisor +import java.util.UUID + +/** + * A physical compute node in a datacenter that is able to run [Application]s. + * + * @property uid The unique identifier of this machine. + * @property name The name of the machine. + * @property scheduler The process scheduler of this machine. + * @property cores The list of processing elements in the machine. + * @property details The details of this host. + */ +data class Host( + override val uid: UUID, + override val name: String, + val scheduler: MachineScheduler, + val cores: List<ProcessingElement>, + override val details: Map<String, Any> = emptyMap() +) : Machine { + /** + * Build the [Behavior] for a physical machine. + */ + override fun invoke(supervisor: ActorRef<MachineSupervisionEvent>): Behavior<MachineMessage> = setup { ctx -> + ctx.send(supervisor, MachineSupervisionEvent.Announce(this, ctx.self)) + ctx.send(supervisor, MachineSupervisionEvent.Up(ctx.self)) + + withTimers { timers -> + val sched = scheduler(this, ctx, timers) + sched.updateResources(cores) + receiveMessage<Any> { msg -> + when (msg) { + is MachineMessage.Submit -> { + sched.submit(msg.application, msg.key, msg.broker) + same() + } + else -> + unhandled() + } + }.join(ProcessSupervisor(sched).unsafeCast()).narrow() + } + } + + override fun equals(other: Any?): Boolean = other is Machine && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineScheduler.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineScheduler.kt new file mode 100644 index 00000000..de14f0fe --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineScheduler.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineMessage + +/** + * A factory interface for constructing a [MachineSchedulerLogic]. + */ +interface MachineScheduler { + /** + * Construct a [MachineSchedulerLogic] in the given [ActorContext]. + * + * @param machine The machine to create the scheduler for. + * @param ctx The actor context to construct a scheduler for. + * @param scheduler The timer scheduler to use. + */ + operator fun invoke( + machine: Machine, + ctx: ActorContext<MachineMessage>, + scheduler: TimerScheduler<MachineMessage> + ): MachineSchedulerLogic +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineSchedulerLogic.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineSchedulerLogic.kt new file mode 100644 index 00000000..879d6ed8 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/MachineSchedulerLogic.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineEvent +import com.atlarge.opendc.model.resources.compute.MachineMessage +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.ProcessSupervisor + +/** + * A scheduler that distributes processes over processing elements in a machine. + * + * @property machine The machine to create the scheduler for. + * @property ctx The context in which the scheduler runs. + * @property scheduler The timer scheduler to use. + */ +abstract class MachineSchedulerLogic( + protected val machine: Machine, + protected val ctx: ActorContext<MachineMessage>, + protected val scheduler: TimerScheduler<MachineMessage> +) : ProcessSupervisor { + /** + * Update the available resources in the machine. + * + * @param cores The available processing cores for the scheduler. + */ + abstract fun updateResources(cores: List<ProcessingElement>) + + /** + * Submit the specified application for scheduling. + * + * @param application The application to submit. + * @param key The key to identify the application instance. + * @param handler The broker of this application. + */ + abstract fun submit(application: Application, key: Any, handler: ActorRef<MachineEvent>) +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessObserver.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessObserver.kt new file mode 100644 index 00000000..11262d79 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessObserver.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.opendc.model.resources.compute.MachineEvent +import com.atlarge.opendc.model.resources.compute.MachineRef +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.Pid + +/** + * An interface for observing processes. + */ +interface ProcessObserver { + /** + * This method is invoked when the setup of an application completed successfully. + * + * @param pid The process id of the process that has been initialized. + */ + fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) + + /** + * This method is invoked when a process exits. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + fun onTermination(instance: MachineRef, pid: Pid, status: Int) + + companion object { + /** + * Create the [Behavior] for a [ProcessObserver]. + * + * @param observer The observer to create the behavior for. + */ + operator fun invoke(observer: ProcessObserver): Behavior<MachineEvent> = receiveMessage { msg -> + when (msg) { + is MachineEvent.Submitted -> observer.onSubmission(msg.instance, msg.application, msg.key, msg.pid) + is MachineEvent.Terminated -> observer.onTermination(msg.instance, msg.pid, msg.status) + } + same() + } + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessState.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessState.kt new file mode 100644 index 00000000..4c685fc4 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessState.kt @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +/** + * An enumeration of the distinct states of an application instance (process). + */ +enum class ProcessState { + /** + * Default state of a process, where the task is waiting to be assigned and installed on a machine. + */ + CREATED, + + /** + * State to indicate that the process is waiting to be ran. + */ + READY, + + /** + * State to indicate that the process is currently running. + */ + RUNNING, + + /** + * State to indicate that the process has been terminated, either successfully or due to failure. + */ + TERMINATED, +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessView.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessView.kt new file mode 100644 index 00000000..bc88e01f --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/ProcessView.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +import com.atlarge.odcsim.ActorRef +import com.atlarge.opendc.model.resources.compute.MachineEvent +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.Pid +import com.atlarge.opendc.model.workload.application.ProcessMessage + +/** + * A process represents a application instance running on a particular machine from the machine scheduler's point of + * view. + * + * @property application The application this is an instance of. + * @property broker The broker of the process, which is informed about its progress. + * @property pid The reference to the application instance. + * @property state The state of the process. + */ +data class ProcessView( + val application: Application, + val broker: ActorRef<MachineEvent>, + val pid: Pid, + var state: ProcessState = ProcessState.CREATED +) { + /** + * The slice of processing elements allocated for the process. Available as soon as the state + * becomes [ProcessState.RUNNING] + */ + lateinit var allocation: ProcessMessage.Allocation +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/SpaceSharedMachineScheduler.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/SpaceSharedMachineScheduler.kt new file mode 100644 index 00000000..457c7070 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/scheduling/SpaceSharedMachineScheduler.kt @@ -0,0 +1,178 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.scheduling + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.odcsim.unsafeCast +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineEvent +import com.atlarge.opendc.model.resources.compute.MachineMessage +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.Pid +import com.atlarge.opendc.model.workload.application.ProcessMessage +import com.atlarge.opendc.model.workload.application.ProcessSupervisor +import java.util.ArrayDeque +import java.util.UUID + +/** + * A machine scheduling policy where processes are space-shared on the machine. + * + * Space-sharing for machine scheduling means that all running processes will be allocated a separate set of the + * [ProcessingElement]s in a [Machine]. Applications are scheduled on the machine in first-in-first-out (FIFO) order, + * thus larger applications may block smaller tasks from proceeding, while space is available (no backfilling). + * + * @property machine The machine to create the scheduler for. + * @property ctx The context in which the scheduler runs. + * @property scheduler The timer scheduler to use. + */ +class SpaceSharedMachineScheduler( + machine: Machine, + ctx: ActorContext<MachineMessage>, + scheduler: TimerScheduler<MachineMessage> +) : MachineSchedulerLogic(machine, ctx, scheduler), ProcessSupervisor { + private var cores = 0 + private val available = ArrayDeque<ProcessingElement>() + private val queue = ArrayDeque<ActorRef<ProcessMessage>>() + private val running = LinkedHashSet<ActorRef<ProcessMessage>>() + private val processes = HashMap<ActorRef<ProcessMessage>, ProcessView>() + + override fun updateResources(cores: List<ProcessingElement>) { + available.addAll(cores) + this.cores = cores.size + + // Add all running tasks in front of the queue + running.reversed().forEach { queue.addFirst(it) } + running.clear() + + reschedule() + } + + override fun submit(application: Application, key: Any, handler: ActorRef<MachineEvent>) { + // Create application instance on the machine + val pid = ctx.spawn(application(), name = application.name + ":" + application.uid + ":" + UUID.randomUUID().toString()) + processes[pid] = ProcessView(application, handler, pid) + + // Setup the task + ctx.send(pid, ProcessMessage.Setup(machine, ctx.self.unsafeCast())) + + // Inform the owner that the task has been submitted + ctx.send(handler, MachineEvent.Submitted(ctx.self, application, key, pid)) + } + + /** + * Reschedule the tasks on this machine. + */ + private fun reschedule() { + while (queue.isNotEmpty()) { + val pid = queue.peek() + val process = processes[pid]!! + + if (process.application.cores >= cores) { + // The task will never fit on the machine + // TODO Fail task + ctx.log.warn("Process {} will not fit in machine: dropping.", process) + queue.remove() + return + } else if (process.application.cores > available.size) { + // The task will not fit at the moment + // Try again if resources become available + ctx.log.debug("Application queued: not enough processing elements available [requested={}, available={}]", + process.application.cores, available.size) + return + } + queue.remove() + + // Compute the available resources + val resources = List(process.application.cores) { + val pe = available.poll() + Pair(pe, 1.0) + }.toMap() + process.state = ProcessState.RUNNING + process.allocation = ProcessMessage.Allocation(resources, Instant.POSITIVE_INFINITY) + running += pid + + ctx.send(pid, process.allocation) + } + } + + override fun onReady(pid: Pid) { + val process = processes[pid]!! + + // Schedule the task if it has been setup + queue.add(pid) + process.state = ProcessState.READY + + reschedule() + } + + override fun onConsume(pid: Pid, utilization: Map<ProcessingElement, Double>, until: Instant) { + val process = processes[pid]!! + val allocation = process.allocation + + if (until > allocation.until) { + // Tasks are not allowed to extend allocation provided by the machine + // TODO Fail the task + ctx.log.warn("Task {} must not extend allocation provided by the machine", pid) + } else if (until < allocation.until) { + // Shrink allocation + process.allocation = allocation.copy(until = until) + } + + // Reschedule the process after the allocation expires + scheduler.after(pid, process.allocation.until - ctx.time) { + // We just extend the allocation + process.allocation = process.allocation.copy(until = Instant.POSITIVE_INFINITY) + ctx.send(pid, process.allocation) + } + } + + override fun onExit(pid: Pid, status: Int) { + val process = processes.remove(pid)!! + running -= pid + scheduler.cancel(pid) + process.allocation.resources.keys.forEach { available.add(it) } + + ctx.log.debug("Application {} terminated with status {}", pid, status) + + // Inform the owner that the task has terminated + ctx.send(process.broker, MachineEvent.Terminated(ctx.self, pid, status)) + + reschedule() + } + + companion object : MachineScheduler { + override fun invoke( + machine: Machine, + ctx: ActorContext<MachineMessage>, + scheduler: TimerScheduler<MachineMessage> + ): MachineSchedulerLogic { + return SpaceSharedMachineScheduler(machine, ctx, scheduler) + } + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisionEvent.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisionEvent.kt new file mode 100644 index 00000000..2b3fae3d --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisionEvent.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.supervision + +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineRef + +/** + * A supervision protocol for [Machine] instances. + */ +sealed class MachineSupervisionEvent { + /** + * Initialization message to introduce to the supervisor a new machine by specifying its static information and + * address. + * + * @property machine The machine that is being announced. + * @property ref The address to talk to the host. + */ + data class Announce(val machine: Machine, val ref: MachineRef) : MachineSupervisionEvent() + + /** + * Indicate that the specified machine has booted up. + * + * @property ref The address to talk to the machine. + */ + data class Up(val ref: MachineRef) : MachineSupervisionEvent() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisor.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisor.kt new file mode 100644 index 00000000..c3607a22 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/resources/compute/supervision/MachineSupervisor.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.resources.compute.supervision + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.MachineRef + +/** + * An interface for supervising [Machine] instances. + */ +interface MachineSupervisor { + /** + * This method is invoked when a new machine is introduced to the supervisor by specifying its static information + * and address. + * + * @param machine The machine that is being announced. + * @param ref The address to talk to the host. + */ + fun onAnnounce(machine: Machine, ref: MachineRef) + + /** + * This method is invoked when a process exits. + * + * @param ref The address to talk to the machine. + */ + fun onUp(ref: MachineRef) + + companion object { + /** + * Create the [Behavior] for a [MachineSupervisor]. + * + * @param supervisor The supervisor to create the behavior for. + */ + operator fun invoke(supervisor: MachineSupervisor): Behavior<MachineSupervisionEvent> = receiveMessage { msg -> + when (msg) { + is MachineSupervisionEvent.Announce -> supervisor.onAnnounce(msg.machine, msg.ref) + is MachineSupervisionEvent.Up -> supervisor.onUp(msg.ref) + } + same() + } + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/Service.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/Service.kt new file mode 100644 index 00000000..e8b25b88 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/Service.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services + +import com.atlarge.opendc.model.Identity +import java.util.UUID + +/** + * An interface for identifying service implementations of the same type (providing the same service). + * + * @param T The shape of the messages the service responds to. + */ +interface Service<T : Any> : Identity + +/** + * Helper class for constructing a [Service]. + * + * @property uid The unique identifier of the service. + * @property name The name of the service. + */ +abstract class AbstractService<T : Any>(override val uid: UUID, override val name: String) : Service<T> { + override fun equals(other: Any?): Boolean = other is Service<*> && uid == other.uid + override fun hashCode(): Int = uid.hashCode() + override fun toString(): String = "Service[uid=$uid,name=$name]" +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceMap.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceMap.kt new file mode 100644 index 00000000..d91208bf --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceMap.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services + +import com.atlarge.odcsim.ActorRef + +/** + * A map containing services. + */ +interface ServiceMap { + /** + * Determine if this map contains the service with the specified [Service]. + * + * @param key The key of the service to check for. + * @return `true` if the service is in the map, `false` otherwise. + */ + operator fun contains(key: Service<*>): Boolean + + /** + * Obtain the service with the specified [Service]. + * + * @param key The key of the service to obtain. + * @return The references to the service. + * @throws IllegalArgumentException if the key does not exists in the map. + */ + operator fun <T : Any> get(key: Service<T>): ActorRef<T> +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceProvider.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceProvider.kt new file mode 100644 index 00000000..1bf5b22e --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/ServiceProvider.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services + +import com.atlarge.odcsim.Behavior +import com.atlarge.opendc.model.Identity +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.ZoneRef +import java.util.UUID + +/** + * An abstract representation of a cloud service implementation provided by a cloud platform. + */ +interface ServiceProvider : Identity { + /** + * The unique identifier of the service implementation. + */ + override val uid: UUID + + /** + * The name of the service implementation. + */ + override val name: String + + /** + * The set of services provided by this [ServiceProvider]. + */ + val provides: Set<Service<*>> + + /** + * The dependencies of the service implementation. + */ + val dependencies: Set<Service<*>> + + /** + * Build the runtime [Behavior] for this service. + * + * @param zone The zone model for which the service should be build. + * @param ref The runtime reference to the zone's actor for communication. + */ + operator fun invoke(zone: Zone, ref: ZoneRef): Behavior<*> +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/ProvisioningService.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/ProvisioningService.kt new file mode 100644 index 00000000..22b70f35 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/ProvisioningService.kt @@ -0,0 +1,77 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.provisioning + +import com.atlarge.odcsim.ActorRef +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.services.AbstractService +import com.atlarge.opendc.model.services.Service +import com.atlarge.opendc.model.services.ServiceProvider +import com.atlarge.opendc.model.services.resources.HostView +import java.util.UUID + +/** + * A cloud platform service that provisions the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +abstract class ProvisioningService : ServiceProvider { + override val provides: Set<Service<*>> = setOf(ProvisioningService) + + /** + * The service key of the provisioner service. + */ + companion object : AbstractService<ProvisioningMessage>(UUID.randomUUID(), "provisioner") +} + +/** + * A message protocol for communicating to the resource provisioner. + */ +sealed class ProvisioningMessage { + /** + * Request the specified number of resources from the provisioner. + * + * @property numHosts The number of hosts to request from the provisioner. + * @property replyTo The actor to reply to. + */ + data class Request(val numHosts: Int, val replyTo: ActorRef<ProvisioningResponse.Lease>) : ProvisioningMessage() + + /** + * Release the specified resource [ProvisioningResponse.Lease]. + * + * @property lease The lease to release. + */ + data class Release(val lease: ProvisioningResponse.Lease) : ProvisioningMessage() +} + +/** + * A message protocol used by the resource provisioner to respond to [ProvisioningMessage]s. + */ +sealed class ProvisioningResponse { + /** + * A lease for the specified hosts. + */ + data class Lease(val hosts: List<HostView>) : ProvisioningResponse() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/SimpleProvisioningService.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/SimpleProvisioningService.kt new file mode 100644 index 00000000..1ddc1b69 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/provisioning/SimpleProvisioningService.kt @@ -0,0 +1,125 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.provisioning + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.StashBuffer +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.unhandled +import com.atlarge.odcsim.unsafeCast +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.ZoneMessage +import com.atlarge.opendc.model.ZoneRef +import com.atlarge.opendc.model.ZoneResponse +import com.atlarge.opendc.model.resources.compute.MachineRef +import com.atlarge.opendc.model.services.Service +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.resources.ResourceManagementMessage +import com.atlarge.opendc.model.services.resources.ResourceManagementResponse +import com.atlarge.opendc.model.services.resources.ResourceManagementService +import com.atlarge.opendc.model.services.resources.ResourceManagerRef +import java.util.ArrayDeque +import java.util.UUID + +/** + * A cloud platform service that provisions the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +object SimpleProvisioningService : ProvisioningService() { + override val uid: UUID = UUID.randomUUID() + override val name: String = "simple-provisioner" + override val dependencies: Set<Service<*>> = setOf(ResourceManagementService) + + /** + * Build the runtime [Behavior] for the resource provisioner, responding to messages of shape [ProvisioningMessage]. + */ + override fun invoke(zone: Zone, ref: ZoneRef): Behavior<ProvisioningMessage> = setup { ctx -> + val buffer = StashBuffer<Any>(capacity = 30) + ctx.send(ref, ZoneMessage.Find(ResourceManagementService, ctx.self.unsafeCast())) + + receiveMessage<Any> { msg -> + when (msg) { + is ZoneResponse.Listing -> { + val service = msg(ResourceManagementService) ?: throw IllegalStateException("Resource management service not available") + buffer.unstashAll(ctx.unsafeCast(), active(zone, service).unsafeCast()) + } + else -> { + buffer.stash(msg) + same() + } + } + }.narrow() + } + + private fun active(zone: Zone, manager: ResourceManagerRef) = setup<ProvisioningMessage> { ctx -> + val hosts = mutableMapOf<MachineRef, HostView>() + val available = ArrayDeque<HostView>() + val leases = mutableSetOf<ProvisioningResponse.Lease>() + + // Subscribe to all machines in the zone + for (cluster in zone.clusters) { + for (host in cluster.hosts) { + ctx.send(manager, ResourceManagementMessage.Lookup(host, ctx.self.unsafeCast())) + } + } + + receiveMessage<Any> { msg -> + when (msg) { + is ProvisioningMessage.Request -> { + ctx.log.info("Provisioning {} hosts", msg.numHosts) + val leaseHosts = mutableListOf<HostView>() + while (available.isNotEmpty() && leaseHosts.size < msg.numHosts) { + leaseHosts += available.poll() + } + val lease = ProvisioningResponse.Lease(leaseHosts) + leases += lease + ctx.send(msg.replyTo, lease) + same() + } + is ProvisioningMessage.Release -> { + val lease = msg.lease + if (lease in leases) { + return@receiveMessage same() + } + available.addAll(lease.hosts) + leases -= lease + same() + } + is ResourceManagementResponse.Listing -> { + if (msg.instance != null) { + hosts[msg.instance.ref] = msg.instance + available.add(msg.instance) + } + same() + } + else -> + unhandled() + } + }.narrow() + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/HostView.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/HostView.kt new file mode 100644 index 00000000..943461cd --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/HostView.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.resources + +import com.atlarge.opendc.model.resources.compute.MachineRef +import com.atlarge.opendc.model.resources.compute.MachineStatus +import com.atlarge.opendc.model.resources.compute.host.Host + +/** + * The dynamic information of a [Host] instance that is being tracked by the [ResourceManagementService]. This means + * that information may not be up-to-date. + * + * @property host The static information of the host. + * @property ref The reference to the host's actor. + * @property status The status of the machine. + */ +data class HostView(val host: Host, val ref: MachineRef, val status: MachineStatus = MachineStatus.HALT) { + override fun equals(other: Any?): Boolean = other is HostView && host.uid == other.host.uid + override fun hashCode(): Int = host.uid.hashCode() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/ResourceManagementService.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/ResourceManagementService.kt new file mode 100644 index 00000000..5e38c6da --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/services/resources/ResourceManagementService.kt @@ -0,0 +1,121 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.resources + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.unhandled +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.ZoneRef +import com.atlarge.opendc.model.resources.compute.MachineRef +import com.atlarge.opendc.model.resources.compute.MachineStatus +import com.atlarge.opendc.model.resources.compute.host.Host +import com.atlarge.opendc.model.resources.compute.supervision.MachineSupervisionEvent +import com.atlarge.opendc.model.services.Service +import com.atlarge.opendc.model.services.ServiceProvider +import java.util.UUID + +/** + * A cloud platform service that manages the native resources on the platform. + * + * This service assumes control over all hosts in its [Zone]. + */ +object ResourceManagementService : ServiceProvider, Service<ResourceManagementMessage> { + override val uid: UUID = UUID.randomUUID() + override val name: String = "resource-manager" + override val provides: Set<Service<*>> = setOf(ResourceManagementService) + override val dependencies: Set<Service<*>> = emptySet() + + /** + * Build the runtime behavior of the [ResourceManagementService]. + */ + override fun invoke(zone: Zone, ref: ZoneRef): Behavior<ResourceManagementMessage> = setup { ctx -> + // Launch the clusters of the zone + for (cluster in zone.clusters) { + ctx.spawn(cluster(ctx.self), name = "${cluster.name}-${cluster.uid}") + } + + val hosts = mutableMapOf<MachineRef, HostView>() + receiveMessage<Any> { msg -> + when (msg) { + is MachineSupervisionEvent.Announce -> { + val host = msg.machine as? Host + if (host != null) { + hosts[msg.ref] = HostView(host, msg.ref) + } + same() + } + is MachineSupervisionEvent.Up -> { + hosts.computeIfPresent(msg.ref) { _, value -> + value.copy(status = MachineStatus.RUNNING) + } + same() + } + is ResourceManagementMessage.Lookup -> { + ctx.send(msg.replyTo, ResourceManagementResponse.Listing(hosts.values.find { it.host == msg.host })) + same() + } + else -> + unhandled() + } + }.narrow() + } +} + +/** + * A reference to the resource manager of a zone. + */ +typealias ResourceManagerRef = ActorRef<ResourceManagementMessage> + +/** + * A message protocol for communicating to the resource manager. + */ +sealed class ResourceManagementMessage { + /** + * Lookup the specified [Host]. + * + * @property host The host to lookup. + * @property replyTo The address to sent the response to. + */ + data class Lookup( + val host: Host, + val replyTo: ActorRef<ResourceManagementResponse.Listing> + ) : ResourceManagementMessage() +} + +/** + * A message protocol used by the resource manager to respond to [ResourceManagementMessage]s. + */ +sealed class ResourceManagementResponse { + /** + * A response to a [ResourceManagementMessage.Lookup] request. + * + * @property instance The instance that was found or `null` if it does not exist. + */ + data class Listing(val instance: HostView?) : ResourceManagementResponse() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/Workload.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/Workload.kt new file mode 100644 index 00000000..c1215715 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/Workload.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload + +import com.atlarge.opendc.model.Identity +import com.atlarge.opendc.model.User + +/** + * A high-level abstraction that represents the actual work that a set of compute resources perform, such + * as running an application on a machine or a whole workflow running multiple tasks on numerous machines. + */ +interface Workload : Identity { + /** + * The owner of this workload. + */ + val owner: User +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Application.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Application.kt new file mode 100644 index 00000000..00ab98b6 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Application.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.application + +import com.atlarge.odcsim.Behavior +import com.atlarge.opendc.model.workload.Workload + +/** + * A generic representation of a workload that can directly be executed by physical or virtual compute resources, + * such as a web server application. + */ +interface Application : Workload { + /** + * The number of processing elements required by the task. + */ + val cores: Int + + /** + * Build the runtime [Behavior] of an application, accepting messages of [ProcessMessage]. + * + * This is a model for the runtime behavior of an application instance (process) that describes how an application + * instance consumes the allocated resources on a machine. + */ + operator fun invoke(): Behavior<ProcessMessage> +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplication.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplication.kt new file mode 100644 index 00000000..60a896d4 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplication.kt @@ -0,0 +1,164 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.application + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.DeferredBehavior +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.receive +import com.atlarge.odcsim.stopped +import com.atlarge.odcsim.unhandled +import com.atlarge.opendc.model.User +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import java.lang.Double.min +import java.util.UUID +import kotlin.math.ceil + +/** + * An [Application] implementation that models applications performing a static number of floating point operations + * ([flops]) on a compute resource. + * + * @property uid A unique identifier for this application. + * @property name The name of the application. + * @property owner The owner of this application. + * @property cores The number of cores needed for this application. + * @property flops The number of floating point operations to perform for this task. + */ +class FlopsApplication( + override val uid: UUID, + override val name: String, + override val owner: User, + override val cores: Int, + val flops: Long +) : Application { + + init { + require(flops >= 0) { "Negative number of flops" } + } + + /** + * Build the runtime [Behavior] based on a number of floating point operations to execute. + */ + override fun invoke(): Behavior<ProcessMessage> = object : DeferredBehavior<ProcessMessage>() { + /** + * The remaining number of floating point operations to execute. + */ + var remaining = flops + + /** + * The machine to which the task is assigned. + */ + lateinit var machine: Machine + + /** + * The reference to the machine instance. + */ + lateinit var ref: ActorRef<ProcessEvent> + + /** + * The start of the last allocation + */ + var start: Instant = 0.0 + + /** + * The given resource allocation. + */ + lateinit var allocation: Map<ProcessingElement, Double> + + override fun invoke(ctx: ActorContext<ProcessMessage>) = created() + + /** + * Handle the initial, created state of a task instance. + */ + private fun created(): Behavior<ProcessMessage> = receive { ctx, msg -> + when (msg) { + is ProcessMessage.Setup -> { + machine = msg.machine + ref = msg.ref + /* TODO implement setup time */ + ctx.send(ref, ProcessEvent.Ready(ctx.self)) + ready().narrow() + } + else -> unhandled() + } + } + + /** + * Handle the ready state of a task instance. + */ + private fun ready(): Behavior<Any> = receive { ctx, msg -> + when (msg) { + is ProcessMessage.Allocation -> { + processAllocation(ctx, msg.resources, msg.until) + running() + } + else -> unhandled() + } + } + + /** + * Handle the running state of a task instance. + */ + private fun running(): Behavior<Any> = receive { ctx, msg -> + when (msg) { + is ProcessMessage.Allocation -> { + /* Compute the consumption of flops */ + val consumed = allocation.asSequence() + .map { (key, value) -> key.unit.clockRate * value * (ctx.time - start) } + .sum() + // Ceil to prevent consumed flops being rounded to 0 + remaining -= ceil(consumed).toLong() + + /* Test whether all flops have been consumed and the task is finished */ + if (remaining <= 0) { + ctx.send(ref, ProcessEvent.Exit(ctx.self, 0)) + return@receive stopped() + } + + processAllocation(ctx, msg.resources, msg.until) + running().narrow() + } + else -> unhandled() + } + } + + private fun processAllocation(ctx: ActorContext<Any>, resources: Map<ProcessingElement, Double>, until: Instant) { + start = ctx.time + allocation = resources + .asSequence() + .take(cores) + .associateBy({ it.key }, { it.value }) + + val speed = allocation.asSequence() + .map { (key, value) -> key.unit.clockRate * value } + .average() + val finishedAt = ctx.time + remaining / speed + ctx.send(ref, ProcessEvent.Consume(ctx.self, allocation, min(finishedAt, until))) + } + } +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Process.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Process.kt new file mode 100644 index 00000000..a78b8572 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/Process.kt @@ -0,0 +1,91 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.application + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Instant +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.ProcessingElement + +/** + * The process id (pid) is a reference to the application instance (process) that accepts messages of + * type [ProcessMessage]. + */ +typealias Pid = ActorRef<ProcessMessage> + +/** + * A message protocol for actors to communicate with task instances (called processes). + */ +sealed class ProcessMessage { + /** + * Indicate that the task should be installed to the specified machine. + * + * @property machine The machine to install the task. + * @property ref The reference to the machine instance. + */ + data class Setup(val machine: Machine, val ref: ActorRef<ProcessEvent>) : ProcessMessage() + + /** + * Indicate an allocation of compute resources on a machine for a certain duration. + * The task may assume that the reservation occurs after installation on the same machine. + * + * @property resources The cpu cores (and the utilization percentages) allocated for the task. + * @property until The point in time till which the reservation is valid. + */ + data class Allocation(val resources: Map<ProcessingElement, Double>, val until: Instant) : ProcessMessage() +} + +/** + * The message protocol used by application instances respond to [ProcessMessage]s. + */ +sealed class ProcessEvent { + /** + * Indicate that the process is ready to start processing. + * + * @property pid A reference to the application instance. + */ + data class Ready(val pid: Pid) : ProcessEvent() + + /** + * Indicate the estimated resource utilization of the task until a specified point in time. + * + * @property pid A reference to the application instance of the represented utilization. + * @property utilization The utilization of the cpu cores as a percentage. + * @property until The point in time until which the utilization is valid. + */ + data class Consume( + val pid: Pid, + val utilization: Map<ProcessingElement, Double>, + val until: Instant + ) : ProcessEvent() + + /** + * Indicate that a process has been terminated. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + data class Exit(val pid: Pid, val status: Int) : ProcessEvent() +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/ProcessSupervisor.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/ProcessSupervisor.kt new file mode 100644 index 00000000..2952c82e --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/model/workload/application/ProcessSupervisor.kt @@ -0,0 +1,79 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.application + +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.opendc.model.resources.compute.ProcessingElement + +/** + * An interface for supervising processes. + */ +interface ProcessSupervisor { + /** + * This method is invoked when the setup of an application completed successfully. + * + * @param pid The process id of the process that has been initialized. + */ + fun onReady(pid: Pid) {} + + /** + * This method is invoked when a process informs the machine that it is running with the + * estimated resource utilization until a specified point in time. + * + * @param pid The process id of the process that is running. + * @param utilization The utilization of the cpu cores as a percentage. + * @param until The point in time until which the utilization is valid. + */ + fun onConsume(pid: Pid, utilization: Map<ProcessingElement, Double>, until: Instant) {} + + /** + * This method is invoked when a process exits. + * + * @property pid A reference to the application instance. + * @property status The exit code of the task, where zero means successful. + */ + fun onExit(pid: Pid, status: Int) {} + + companion object { + /** + * Create the [Behavior] for a [ProcessSupervisor]. + * + * @param supervisor The supervisor to create the behavior for. + */ + operator fun invoke(supervisor: ProcessSupervisor): Behavior<ProcessEvent> { + return receiveMessage { msg -> + when (msg) { + is ProcessEvent.Ready -> supervisor.onReady(msg.pid) + is ProcessEvent.Consume -> supervisor.onConsume(msg.pid, msg.utilization, msg.until) + is ProcessEvent.Exit -> supervisor.onExit(msg.pid, msg.status) + } + same() + } + } + } +} diff --git a/opendc-core/src/test/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplicationTest.kt b/opendc-core/src/test/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplicationTest.kt new file mode 100644 index 00000000..2cde1b6f --- /dev/null +++ b/opendc-core/src/test/kotlin/com/atlarge/opendc/model/workload/application/FlopsApplicationTest.kt @@ -0,0 +1,128 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.application + +import com.atlarge.odcsim.testkit.BehaviorTestKit +import com.atlarge.opendc.model.User +import com.atlarge.opendc.model.resources.compute.Machine +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import com.atlarge.opendc.model.resources.compute.ProcessingUnit +import com.nhaarman.mockitokotlin2.mock +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import java.util.UUID + +/** + * A test suite for the [FlopsApplication]. + */ +@DisplayName("FlopsApplication") +internal class FlopsApplicationTest { + private val flops = 10000L + private val cores = 2 + private val machine: Machine = mock() + private val user: User = mock() + private val cpu: ProcessingUnit = ProcessingUnit("Intel", 6, 8600, "Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz", 2900.0, 1) + + private lateinit var application: FlopsApplication + + @BeforeEach + fun setUp() { + application = FlopsApplication(UUID.randomUUID(), "java", user, cores, flops) + } + + @Test + fun `should become ready after triggering installation`() { + val test = BehaviorTestKit(application()) + val inbox = test.createInbox<ProcessEvent>() + test.run(ProcessMessage.Setup(machine, inbox.ref)) + inbox.expectMessage(ProcessEvent.Ready(test.ref)) + } + + @Test + fun `should not respond to setup request after being created`() { + val test = BehaviorTestKit(application()) + val inbox = test.createInbox<ProcessEvent>() + + // Setup Machine + test.run(ProcessMessage.Setup(machine, inbox.ref)) + inbox.clear() + + // Try again + assertFalse(test.run(ProcessMessage.Setup(machine, inbox.ref))) + } + + @Test + fun `should respond to allocation with consumption`() { + val test = BehaviorTestKit(application()) + val inbox = test.createInbox<ProcessEvent>() + + val allocation = ProcessMessage.Allocation(mapOf(ProcessingElement(0, cpu) to 0.5), until = 10.0) + + // Setup Machine + test.run(ProcessMessage.Setup(machine, inbox.ref)) + inbox.clear() + + // Test allocation + test.run(allocation) + val msg = inbox.receiveMessage() + assertTrue(msg is ProcessEvent.Consume) + } + + @Test + fun `should inform the machine that it finished processing`() { + val test = BehaviorTestKit(application()) + val inbox = test.createInbox<ProcessEvent>() + val allocation = ProcessMessage.Allocation(mapOf(ProcessingElement(0, cpu) to 0.5), until = 10.0) + + // Setup + test.run(ProcessMessage.Setup(machine, inbox.ref)) + test.run(allocation) + test.runTo(10.0) + inbox.clear() + + test.run(allocation) + inbox.expectMessage(ProcessEvent.Exit(test.ref, 0)) + } + + @Test + fun `should be able to update its utilization`() { + val test = BehaviorTestKit(application()) + val inbox = test.createInbox<ProcessEvent>() + val allocation1 = ProcessMessage.Allocation(mapOf(ProcessingElement(0, cpu) to 0.5), until = 10.0) + val allocation2 = ProcessMessage.Allocation(mapOf(ProcessingElement(0, cpu) to 0.25), until = 10.0) + + // Setup + test.run(ProcessMessage.Setup(machine, inbox.ref)) + test.run(allocation1) + test.runTo(5.0) + inbox.clear() + + test.run(allocation2) + assertTrue(inbox.receiveMessage() is ProcessEvent.Consume) + } +} diff --git a/opendc-experiments-tpds/build.gradle.kts b/opendc-experiments-tpds/build.gradle.kts new file mode 100644 index 00000000..3ec580af --- /dev/null +++ b/opendc-experiments-tpds/build.gradle.kts @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` + application +} + +/* Project configuration */ +repositories { + jcenter() +} + +application { + mainClassName = "com.atlarge.opendc.experiments.tpds.TestExperiment" +} + +dependencies { + api(project(":opendc-core")) + implementation(project(":opendc-format-gwf")) + implementation(project(":opendc-format-sc18")) + implementation(project(":opendc-workflows")) + implementation(kotlin("stdlib")) + + runtimeOnly(project(":odcsim-engine-omega")) + runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.11.2") +} diff --git a/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt b/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt new file mode 100644 index 00000000..ad302889 --- /dev/null +++ b/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt @@ -0,0 +1,148 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.tpds + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorSystemFactory +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.odcsim.coroutines.suspending +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.stopped +import com.atlarge.odcsim.unhandled +import com.atlarge.odcsim.withTimers +import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader +import com.atlarge.opendc.format.trace.gwf.GwfTraceReader +import com.atlarge.opendc.model.Broker +import com.atlarge.opendc.model.Model +import com.atlarge.opendc.model.PlatformRef +import com.atlarge.opendc.model.find +import com.atlarge.opendc.model.services.provisioning.SimpleProvisioningService +import com.atlarge.opendc.model.services.resources.ResourceManagementService +import com.atlarge.opendc.model.services.workflows.StageWorkflowScheduler +import com.atlarge.opendc.model.services.workflows.WorkflowEvent +import com.atlarge.opendc.model.services.workflows.WorkflowMessage +import com.atlarge.opendc.model.services.workflows.WorkflowSchedulerMode +import com.atlarge.opendc.model.services.workflows.WorkflowService +import com.atlarge.opendc.model.services.workflows.stages.job.FifoJobSortingPolicy +import com.atlarge.opendc.model.services.workflows.stages.job.NullJobAdmissionPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.FirstFitResourceSelectionPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.FunctionalResourceDynamicFilterPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.FifoTaskSortingPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.FunctionalTaskEligibilityPolicy +import com.atlarge.opendc.model.workload.workflow.Job +import com.atlarge.opendc.model.zones +import java.io.File +import java.util.ServiceLoader +import kotlin.math.max + +/** + * Main entry point of the experiment. + */ +fun main(args: Array<String>) { + if (args.isEmpty()) { + println("error: Please provide path to GWF trace") + return + } + + + val scheduler = StageWorkflowScheduler( + mode = WorkflowSchedulerMode.Batch(100.0), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobSortingPolicy = FifoJobSortingPolicy(), + taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), + taskSortingPolicy = FifoTaskSortingPolicy(), + resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), + resourceSelectionPolicy = FirstFitResourceSelectionPolicy() + ) + + val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) + .use { it.read() } + .let { env -> + env.copy(platforms = env.platforms.map { platform -> + platform.copy(zones = platform.zones.map { zone -> + val services = zone.services + setOf(ResourceManagementService, SimpleProvisioningService, WorkflowService(scheduler)) + zone.copy(services = services) + }) + }) + } + + val broker = object : Broker { + override fun invoke(platforms: List<PlatformRef>): Behavior<*> = suspending<Any> { ctx -> + val zones = platforms.first().zones() + val service = zones.values.first().find(WorkflowService) + + val activeJobs = mutableSetOf<Job>() + val reader = GwfTraceReader(File(args[0])) + + fun submitNext(ctx: ActorContext<Any>, timers: TimerScheduler<Any>) { + if (!reader.hasNext()) { + return + } + + val (time, job) = reader.next() + timers.after(job, max(.0, time - ctx.time)) { + ctx.send(service, WorkflowMessage.Submit(job, ctx.self)) + submitNext(ctx, timers) + } + } + + var total = 0 + var finished = 0 + + withTimers { timers -> + submitNext(ctx, timers) + receiveMessage { msg -> + when (msg) { + is WorkflowEvent.JobSubmitted -> { + ctx.log.info("Job {} submitted", msg.job.uid) + total += 1 + same() + } + is WorkflowEvent.JobStarted -> { + activeJobs += msg.job + same() + } + is WorkflowEvent.JobFinished -> { + activeJobs -= msg.job + finished += 1 + ctx.log.info("Jobs {}/{} finished ({} tasks)", finished, total, msg.job.tasks.size) + if (activeJobs.isEmpty()) stopped() else same() + } + else -> + unhandled() + } + } + } + } + } + + val model = Model(environment, listOf(broker)) + val factory = ServiceLoader.load(ActorSystemFactory::class.java).first() + val system = factory(model(), name = "sim") + system.run() + system.terminate() +} diff --git a/opendc-experiments-tpds/src/main/resources/env/setup-test.json b/opendc-experiments-tpds/src/main/resources/env/setup-test.json new file mode 100644 index 00000000..0965b250 --- /dev/null +++ b/opendc-experiments-tpds/src/main/resources/env/setup-test.json @@ -0,0 +1,36 @@ +{ + "name": "Experimental Setup 2", + "rooms": [ + { + "type": "SERVER", + "objects": [ + { + "type": "RACK", + "machines": [ + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]}, + { "cpus": [2] }, { "cpus": [2]} + ] + }, + { + "type": "RACK", + "machines": [ + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]}, + { "cpus": [1] }, { "cpus": [1]} + ] + } + ] + } + ] +} diff --git a/opendc-experiments-tpds/src/main/resources/log4j2.xml b/opendc-experiments-tpds/src/main/resources/log4j2.xml new file mode 100644 index 00000000..67bf34ab --- /dev/null +++ b/opendc-experiments-tpds/src/main/resources/log4j2.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ MIT License + ~ + ~ Copyright (c) 2019 atlarge-research + ~ + ~ Permission is hereby granted, free of charge, to any person obtaining a copy + ~ of this software and associated documentation files (the "Software"), to deal + ~ in the Software without restriction, including without limitation the rights + ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + ~ copies of the Software, and to permit persons to whom the Software is + ~ furnished to do so, subject to the following conditions: + ~ + ~ The above copyright notice and this permission notice shall be included in all + ~ copies or substantial portions of the Software. + ~ + ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + ~ SOFTWARE. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_ERR"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%X{actor.time}] %level %msg%n" /> + </Console> + <File name="File" fileName="data/opendc.log" append="false"> + <PatternLayout pattern="%d{yyy-MM-dd HH:mm:ss.SSS} [%X{actor.time}] %-5level %X{actor.ref} - %msg%n"/> + </File> + </Appenders> + <Loggers> + <Logger name="com.atlarge.odcsim" level="info" additivity="false"> + <AppenderRef ref="Console" level="info" /> + <AppenderRef ref="File" level="info" /> + </Logger> + + <!-- Experiment runner can log on INFO level --> + <Logger name="com.atlarge.opendc.experiments.tpds" level="debug" additivity="false"> + <AppenderRef ref="Console" level="info" /> + <AppenderRef ref="File" level="debug" /> + </Logger> + + <Root level="error"> + <AppenderRef ref="Console" /> + <AppenderRef ref="File" /> + </Root> + </Loggers> +</Configuration> diff --git a/opendc-format-gwf/build.gradle.kts b/opendc-format-gwf/build.gradle.kts new file mode 100644 index 00000000..caf86c0c --- /dev/null +++ b/opendc-format-gwf/build.gradle.kts @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` +} + +/* Project configuration */ +repositories { + jcenter() +} + +val junitJupiterVersion: String by extra +val junitPlatformVersion: String by extra + +dependencies { + api(project(":opendc-core")) + api(project(":opendc-format")) + api(project(":opendc-workflows")) + implementation(kotlin("stdlib")) + + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion") + testImplementation("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") +} diff --git a/opendc-format-gwf/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format-gwf/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt new file mode 100644 index 00000000..df6a4b11 --- /dev/null +++ b/opendc-format-gwf/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt @@ -0,0 +1,168 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.gwf + +import com.atlarge.odcsim.Instant +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import com.atlarge.opendc.model.User +import com.atlarge.opendc.model.workload.application.FlopsApplication +import com.atlarge.opendc.model.workload.workflow.Job +import com.atlarge.opendc.model.workload.workflow.Task +import java.io.BufferedReader +import java.io.File +import java.io.InputStream +import java.util.UUID +import kotlin.math.max +import kotlin.math.min + +/** + * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more + * information about the format. + * + * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore + * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a + * different format for better performance. + * + * @param reader The buffered reader to read the trace with. + */ +class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator<TraceEntry<Job>> + + /** + * Create a [GwfTraceReader] instance from the specified [File]. + * + * @param file The file to read from. + */ + constructor(file: File) : this(file.bufferedReader()) + + /** + * Create a [GwfTraceReader] instance from the specified [InputStream]. + * + * @param input The input stream to read from. + */ + constructor(input: InputStream) : this(input.bufferedReader()) + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf<Long, TraceEntryImpl>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, List<Long>>() + + var workflowIdCol = 0 + var taskIdCol = 0 + var submitTimeCol = 0 + var runtimeCol = 0 + var coreCol = 0 + var dependencyCol = 0 + + try { + reader.lineSequence() + .filter { line -> + // Ignore comments in the trace + !line.startsWith("#") && line.isNotBlank() + } + .forEachIndexed { idx, line -> + val values = line.split(",") + + // Parse GWF header + if (idx == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + workflowIdCol = header["WorkflowID"]!! + taskIdCol = header["JobID"]!! + submitTimeCol = header["SubmitTime"]!! + runtimeCol = header["RunTime"]!! + coreCol = header["NProcs"]!! + dependencyCol = header["Dependencies"]!! + return@forEachIndexed + } + + val workflowId = values[workflowIdCol].trim().toLong() + val taskId = values[taskIdCol].trim().toLong() + val submitTime = values[submitTimeCol].trim().toDouble() + val runtime = max(0, values[runtimeCol].trim().toLong()) + val cores = values[coreCol].trim().toInt() + val dependencies = values[dependencyCol].split(" ") + .filter { it.isNotEmpty() } + .map { it.trim().toLong() } + + val flops: Long = 4000 * runtime * cores + + val entry = entries.getOrPut(workflowId) { + TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) + } + val workflow = entry.workload + val task = Task( + UUID(0L, taskId), "<unnamed>", + FlopsApplication(UUID(0L, taskId), "<unnamed>", workflow.owner, cores, flops), + HashSet() + ) + entry.submissionTime = min(entry.submissionTime, submitTime) + (workflow.tasks as MutableSet<Task>).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } + } finally { + reader.close() + } + + // Fix dependencies and dependents for all tasks + taskDependencies.forEach { (task, dependencies) -> + (task.dependencies as MutableSet<Task>).addAll(dependencies.map { taskId -> + tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") + }) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<Job> = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "<unnamed>" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Instant, + override val workload: Job + ) : TraceEntry<Job> +} diff --git a/opendc-format-gwf/src/test/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReaderTest.kt b/opendc-format-gwf/src/test/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReaderTest.kt new file mode 100644 index 00000000..ca60f61d --- /dev/null +++ b/opendc-format-gwf/src/test/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReaderTest.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace.gwf + +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test + +/** + * Test suite for the [GwfTraceReader] class. + */ +@DisplayName("GwfTraceReader") +internal class GwfTraceReaderTest { + @Test + fun `should open from InputStream`() { + val input = GwfTraceReaderTest::class.java.getResourceAsStream("/askalon_workload_olde.gwf") + val reader = GwfTraceReader(input) + reader.close() + } +} diff --git a/opendc-format-gwf/src/test/resources/trace.gwf b/opendc-format-gwf/src/test/resources/trace.gwf new file mode 100644 index 00000000..75b4c8d8 --- /dev/null +++ b/opendc-format-gwf/src/test/resources/trace.gwf @@ -0,0 +1,4 @@ +WorkflowID, JobID , SubmitTime, RunTime , NProcs , ReqNProcs , Dependencies +0 , 1 , 50 , 13 , 1 , 1 , +1 , 2 , 64 , 13 , 1 , 1 , +1 , 3 , 1821 , 12 , 1 , 1 , 2 diff --git a/opendc-format-sc18/build.gradle.kts b/opendc-format-sc18/build.gradle.kts new file mode 100644 index 00000000..b656f4e3 --- /dev/null +++ b/opendc-format-sc18/build.gradle.kts @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` +} + +/* Project configuration */ +repositories { + jcenter() +} + +val junitJupiterVersion: String by extra +val junitPlatformVersion: String by extra + +dependencies { + api(project(":opendc-core")) + api(project(":opendc-format")) + api(project(":opendc-workflows")) + api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") + implementation(kotlin("stdlib")) + implementation(kotlin("reflect")) + + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion") + testImplementation("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") +} diff --git a/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt b/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt new file mode 100644 index 00000000..4fbde269 --- /dev/null +++ b/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Model.kt @@ -0,0 +1,44 @@ +package com.atlarge.opendc.format.environment.sc18 + +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo + +/** + * A datacenter setup. + * + * @property name The name of the setup. + * @property rooms The rooms in the datacenter. + */ +data class Setup(val name: String, val rooms: List<Room>) + +/** + * A room in a datacenter. + * + * @property type The type of room in the datacenter. + * @property objects The objects in the room. + */ +data class Room(val type: String, val objects: List<RoomObject>) + +/** + * An object in a [Room]. + * + * @property type The type of the room object. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) +sealed class RoomObject(val type: String) { + /** + * A rack in a server room. + * + * @property machines The machines in the rack. + */ + data class Rack(val machines: List<Machine>) : RoomObject("RACK") +} + +/** + * A machine in the setup that consists of the specified CPU's represented as + * integer identifiers and ethernet speed. + * + * @property cpus The CPUs in the machine represented as integer identifiers. + */ +data class Machine(val cpus: List<Int>) diff --git a/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt new file mode 100644 index 00000000..40ed5a45 --- /dev/null +++ b/opendc-format-sc18/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -0,0 +1,95 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment.sc18 + +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.atlarge.opendc.model.Cluster +import com.atlarge.opendc.model.Environment +import com.atlarge.opendc.model.Platform +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.resources.compute.ProcessingElement +import com.atlarge.opendc.model.resources.compute.ProcessingUnit +import com.atlarge.opendc.model.resources.compute.host.Host +import com.atlarge.opendc.model.resources.compute.scheduling.SpaceSharedMachineScheduler +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import java.io.InputStream +import java.util.UUID + +/** + * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Datacenter + * Schedulers". + * + * @param input The input stream to read from. + * @param mapper The Jackson object mapper to use. + */ +class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { + /** + * The environment that was read from the file. + */ + private val environment: Environment + + init { + val setup = mapper.readValue<Setup>(input) + val clusters = setup.rooms.mapIndexed { i, room -> + var counter = 0 + val hosts = room.objects.flatMap { roomObject -> + when (roomObject) { + is RoomObject.Rack -> { + roomObject.machines.map { machine -> + val cores = machine.cpus.flatMap { id -> + when (id) { + 1 -> List(4) { ProcessingElement(it, CPUS[0]) } + 2 -> List(2) { ProcessingElement(it, CPUS[1]) } + else -> throw IllegalArgumentException("The cpu id $id is not recognized") + } + } + Host(UUID.randomUUID(), "node-${counter++}", SpaceSharedMachineScheduler, cores) + } + } + } + } + Cluster(UUID.randomUUID(), "cluster-$i", hosts) + } + + val platform = Platform(UUID.randomUUID(), "sc18-platform", listOf( + Zone(UUID.randomUUID(), "zone", emptySet(), clusters) + )) + + environment = Environment(setup.name, null, listOf(platform)) + } + + override fun read(): Environment = environment + + override fun close() {} + + companion object { + val CPUS = arrayOf( + ProcessingUnit("Intel", 6, 6920, "Intel(R) Core(TM) i7-6920HQ CPU @ 4.10GHz", 4100.0, 1), + ProcessingUnit("Intel", 6, 6930, "Intel(R) Core(TM) i7-6920HQ CPU @ 3.50GHz", 3500.0, 1) + ) + } +} diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts new file mode 100644 index 00000000..68f9aa5d --- /dev/null +++ b/opendc-format/build.gradle.kts @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` +} + +/* Project configuration */ +repositories { + jcenter() +} + +val junitJupiterVersion: String by extra +val junitPlatformVersion: String by extra + +dependencies { + api(project(":odcsim-core")) + api(project(":opendc-core")) + + implementation(kotlin("stdlib")) + + testImplementation(project(":odcsim-testkit")) + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion") + testImplementation("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") + testRuntimeOnly("org.slf4j:slf4j-simple:1.7.25") + testImplementation("com.nhaarman.mockitokotlin2:mockito-kotlin:2.0.0") +} diff --git a/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt new file mode 100644 index 00000000..0ba3ae25 --- /dev/null +++ b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.environment + +import com.atlarge.opendc.model.Environment +import java.io.Closeable + +/** + * An interface for reading descriptions of datacenter environments into memory as [Environment]. + */ +interface EnvironmentReader : Closeable { + /** + * Read the description of the datacenter environment as [Environment]. + */ + fun read(): Environment +} diff --git a/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt new file mode 100644 index 00000000..cf0ab526 --- /dev/null +++ b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceEntry.kt @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.odcsim.Instant +import com.atlarge.opendc.model.workload.Workload + +/** + * An entry in a workload trace. + * + * @param T The shape of the workload in this entry. + */ +interface TraceEntry<T : Workload> { + /** + * The time of submission of the workload. + */ + val submissionTime: Instant + + /** + * The workload in this trace entry. + */ + val workload: T + + /** + * Extract the submission time from this entry. + */ + operator fun component1() = submissionTime + + /** + * Extract the workload from this entry. + */ + operator fun component2() = workload +} diff --git a/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt new file mode 100644 index 00000000..af8b272d --- /dev/null +++ b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceReader.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.opendc.model.workload.Workload +import java.io.Closeable + +/** + * An interface for reading [Workload]s into memory. + * + * This interface must guarantee that the entries are delivered in order of submission time. + * + * @param T The shape of the workloads supported by this reader. + */ +interface TraceReader<T : Workload> : Iterator<TraceEntry<T>>, Closeable diff --git a/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt new file mode 100644 index 00000000..b5424fd2 --- /dev/null +++ b/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/TraceWriter.kt @@ -0,0 +1,46 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.format.trace + +import com.atlarge.odcsim.Instant +import com.atlarge.opendc.model.workload.Workload +import java.io.Closeable + +/** + * An interface for persisting workload traces (e.g. to disk). + * + * @param T The type of [Workload] supported by this writer. + */ +interface TraceWriter<T : Workload> : Closeable { + /** + * Write an entry to the trace. + * + * Entries must be written in order of submission time. Failing to do so results in a [IllegalArgumentException]. + * + * @param submissionTime The time of submission of the workload. + * @param workload The workload to write to the trace. + */ + fun write(submissionTime: Instant, workload: T) +} diff --git a/opendc-workflows/build.gradle.kts b/opendc-workflows/build.gradle.kts new file mode 100644 index 00000000..68f9aa5d --- /dev/null +++ b/opendc-workflows/build.gradle.kts @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +/* Build configuration */ +apply(from = "../gradle/kotlin.gradle") +plugins { + `java-library` +} + +/* Project configuration */ +repositories { + jcenter() +} + +val junitJupiterVersion: String by extra +val junitPlatformVersion: String by extra + +dependencies { + api(project(":odcsim-core")) + api(project(":opendc-core")) + + implementation(kotlin("stdlib")) + + testImplementation(project(":odcsim-testkit")) + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion") + testImplementation("org.junit.platform:junit-platform-launcher:$junitPlatformVersion") + testRuntimeOnly("org.slf4j:slf4j-simple:1.7.25") + testImplementation("com.nhaarman.mockitokotlin2:mockito-kotlin:2.0.0") +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt new file mode 100644 index 00000000..45f3c4b0 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.model.services.workflows.stages.job.JobAdmissionPolicy +import com.atlarge.opendc.model.services.workflows.stages.job.JobSortingPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceDynamicFilterPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceSelectionPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.TaskEligibilityPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.TaskSortingPolicy + +/** + * A [WorkflowScheduler] that distributes work through a multi-stage process based on the Reference Architecture for + * Datacenter Scheduling. + */ +class StageWorkflowScheduler( + private val mode: WorkflowSchedulerMode, + private val jobAdmissionPolicy: JobAdmissionPolicy, + private val jobSortingPolicy: JobSortingPolicy, + private val taskEligibilityPolicy: TaskEligibilityPolicy, + private val taskSortingPolicy: TaskSortingPolicy, + private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, + private val resourceSelectionPolicy: ResourceSelectionPolicy +) : WorkflowScheduler { + override fun invoke( + ctx: ActorContext<WorkflowMessage>, + timers: TimerScheduler<WorkflowMessage>, + lease: ProvisioningResponse.Lease + ): WorkflowSchedulerLogic { + return StageWorkflowSchedulerLogic(ctx, timers, lease, mode, jobAdmissionPolicy, + jobSortingPolicy, taskEligibilityPolicy, taskSortingPolicy, resourceDynamicFilterPolicy, resourceSelectionPolicy) + } +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt new file mode 100644 index 00000000..9d5f4bea --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt @@ -0,0 +1,248 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.odcsim.unsafeCast +import com.atlarge.opendc.model.resources.compute.MachineMessage +import com.atlarge.opendc.model.resources.compute.MachineRef +import com.atlarge.opendc.model.resources.compute.scheduling.ProcessState +import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.workflows.stages.job.JobAdmissionPolicy +import com.atlarge.opendc.model.services.workflows.stages.job.JobSortingPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceDynamicFilterPolicy +import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceSelectionPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.TaskEligibilityPolicy +import com.atlarge.opendc.model.services.workflows.stages.task.TaskSortingPolicy +import com.atlarge.opendc.model.workload.application.Application +import com.atlarge.opendc.model.workload.application.Pid +import com.atlarge.opendc.model.workload.workflow.Job +import com.atlarge.opendc.model.workload.workflow.Task + +/** + * Logic of the [StageWorkflowScheduler]. + */ +class StageWorkflowSchedulerLogic( + ctx: ActorContext<WorkflowMessage>, + timers: TimerScheduler<WorkflowMessage>, + lease: ProvisioningResponse.Lease, + private val mode: WorkflowSchedulerMode, + private val jobAdmissionPolicy: JobAdmissionPolicy, + private val jobSortingPolicy: JobSortingPolicy, + private val taskEligibilityPolicy: TaskEligibilityPolicy, + private val taskSortingPolicy: TaskSortingPolicy, + private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, + private val resourceSelectionPolicy: ResourceSelectionPolicy +) : WorkflowSchedulerLogic(ctx, timers, lease) { + + /** + * The incoming jobs ready to be processed by the scheduler. + */ + internal val incomingJobs: MutableSet<JobView> = mutableSetOf() + + /** + * The active jobs in the system. + */ + internal val activeJobs: MutableSet<JobView> = mutableSetOf() + + /** + * The running tasks by [Pid]. + */ + internal val taskByPid = mutableMapOf<Pid, TaskView>() + + /** + * The available processor cores on the leased machines. + */ + internal val machineCores: MutableMap<HostView, Int> = HashMap() + + init { + lease.hosts.forEach { machineCores[it] = it.host.cores.count() } + } + + override fun submit(job: Job, handler: ActorRef<WorkflowEvent>) { + // J1 Incoming Jobs + val jobInstance = JobView(job, handler) + val instances = job.tasks.associateWith { + TaskView(jobInstance, it) + } + + for ((task, instance) in instances) { + instance.dependencies.addAll(task.dependencies.map { instances[it]!! }) + task.dependencies.forEach { + instances[it]!!.dependents.add(instance) + } + + // If the task has no dependency, it is a root task and can immediately be evaluated + if (instance.isRoot) { + instance.state = ProcessState.READY + } + } + + jobInstance.tasks = instances.values.toMutableSet() + incomingJobs += jobInstance + ctx.send(handler, WorkflowEvent.JobSubmitted(ctx.self, job, ctx.time)) + requestCycle() + } + + /** + * Indicate to the scheduler that a scheduling cycle is needed. + */ + private fun requestCycle() { + when (mode) { + is WorkflowSchedulerMode.Interactive -> { + schedule() + } + is WorkflowSchedulerMode.Batch -> { + timers.after(mode, mode.quantum) { + schedule() + } + } + } + } + + /** + * Perform a scheduling cycle immediately. + */ + override fun schedule() { + // J2 Create list of eligible jobs + jobAdmissionPolicy.startCycle(this) + val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) } + for (jobInstance in eligibleJobs) { + incomingJobs -= jobInstance + activeJobs += jobInstance + ctx.send(jobInstance.broker, WorkflowEvent.JobStarted(ctx.self, jobInstance.job, ctx.time)) + } + + // J3 Sort jobs on criterion + val sortedJobs = jobSortingPolicy(this, activeJobs) + + // J4 Per job + for (jobInstance in sortedJobs) { + // T1 Create list of eligible tasks + taskEligibilityPolicy.startCycle(this) + val eligibleTasks = jobInstance.tasks.filter { taskEligibilityPolicy.isEligible(this, it) } + + // T2 Sort tasks on criterion + val sortedTasks = taskSortingPolicy(this, eligibleTasks) + + // T3 Per task + for (instance in sortedTasks) { + val hosts = resourceDynamicFilterPolicy(this, lease.hosts, instance) + val host = resourceSelectionPolicy.select(this, hosts, instance) + + if (host != null) { + // T4 Submit task to machine + ctx.send(host.ref, MachineMessage.Submit(instance.task.application, instance, ctx.self.unsafeCast())) + instance.host = host + instance.state = ProcessState.RUNNING // Assume the application starts running + machineCores.merge(host, instance.task.application.cores, Int::minus) + } else { + return + } + } + } + } + + override fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) { + val task = key as TaskView + task.pid = pid + taskByPid[pid] = task + ctx.send(task.job.broker, WorkflowEvent.TaskStarted(ctx.self, task.job.job, task.task, ctx.time)) + } + + override fun onTermination(instance: MachineRef, pid: Pid, status: Int) { + val task = taskByPid.remove(pid)!! + val job = task.job + task.state = ProcessState.TERMINATED + job.tasks.remove(task) + machineCores.merge(task.host!!, task.task.application.cores, Int::plus) + ctx.send(job.broker, WorkflowEvent.TaskFinished(ctx.self, job.job, task.task, status, ctx.time)) + + if (job.isFinished) { + activeJobs -= job + ctx.send(job.broker, WorkflowEvent.JobFinished(ctx.self, job.job, ctx.time)) + } + + requestCycle() + } + + class JobView(val job: Job, val broker: ActorRef<WorkflowEvent>) { + /** + * A flag to indicate whether this job is finished. + */ + val isFinished: Boolean + get() = tasks.isEmpty() + + lateinit var tasks: MutableSet<TaskView> + } + + class TaskView(val job: JobView, val task: Task) { + /** + * The dependencies of this task. + */ + val dependencies = HashSet<TaskView>() + + /** + * The dependents of this task. + */ + val dependents = HashSet<TaskView>() + + /** + * A flag to indicate whether this workflow task instance is a workflow root. + */ + val isRoot: Boolean + get() = dependencies.isEmpty() + + var state: ProcessState = ProcessState.CREATED + set(value) { + field = value + + // Mark the process as terminated in the graph + if (value == ProcessState.TERMINATED) { + markTerminated() + } + } + + var pid: Pid? = null + + var host: HostView? = null + + /** + * Mark the specified [TaskView] as terminated. + */ + private fun markTerminated() { + for (dependent in dependents) { + dependent.dependencies.remove(this) + + if (dependent.isRoot) { + dependent.state = ProcessState.READY + } + } + } + } +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt new file mode 100644 index 00000000..c81085d4 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse + +/** + * A factory interface for constructing a [WorkflowSchedulerLogic]. + */ +interface WorkflowScheduler { + /** + * Construct a [WorkflowSchedulerLogic] in the given [ActorContext]. + * + * @param ctx The context in which the scheduler runs. + * @param timers The timer scheduler to use. + * @param lease The resource lease to use. + */ + operator fun invoke( + ctx: ActorContext<WorkflowMessage>, + timers: TimerScheduler<WorkflowMessage>, + lease: ProvisioningResponse.Lease + ): WorkflowSchedulerLogic +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt new file mode 100644 index 00000000..09cb0ef9 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.opendc.model.resources.compute.scheduling.ProcessObserver +import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.model.workload.workflow.Job + +/** + * A workflow scheduler interface that schedules jobs across machines. + * + * @property ctx The context in which the scheduler runs. + * @property timers The timer scheduler to use. + * @property lease The resource lease to use. + */ +abstract class WorkflowSchedulerLogic( + protected val ctx: ActorContext<WorkflowMessage>, + protected val timers: TimerScheduler<WorkflowMessage>, + protected val lease: ProvisioningResponse.Lease +) : ProcessObserver { + /** + * Submit the specified workflow for scheduling. + */ + abstract fun submit(job: Job, handler: ActorRef<WorkflowEvent>) + + /** + * Trigger an immediate scheduling cycle. + */ + abstract fun schedule() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt new file mode 100644 index 00000000..0a4b40e5 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.Duration + +/** + * The operating mode of a workflow scheduler. + */ +sealed class WorkflowSchedulerMode { + /** + * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. + */ + object Interactive : WorkflowSchedulerMode() + + /** + * A batch scheduler triggers a scheduling cycle every time quantum if needed. + */ + data class Batch(val quantum: Duration) : WorkflowSchedulerMode() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt new file mode 100644 index 00000000..72397203 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt @@ -0,0 +1,193 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.TimerScheduler +import com.atlarge.odcsim.coroutines.actorContext +import com.atlarge.odcsim.coroutines.dsl.ask +import com.atlarge.odcsim.coroutines.suspending +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.same +import com.atlarge.odcsim.unhandled +import com.atlarge.odcsim.withTimers +import com.atlarge.opendc.model.Zone +import com.atlarge.opendc.model.ZoneRef +import com.atlarge.opendc.model.find +import com.atlarge.opendc.model.resources.compute.MachineEvent +import com.atlarge.opendc.model.services.AbstractService +import com.atlarge.opendc.model.services.Service +import com.atlarge.opendc.model.services.ServiceProvider +import com.atlarge.opendc.model.services.provisioning.ProvisioningMessage +import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse +import com.atlarge.opendc.model.services.provisioning.ProvisioningService +import com.atlarge.opendc.model.workload.workflow.Job +import com.atlarge.opendc.model.workload.workflow.Task +import java.util.UUID + +/** + * A service for cloud workflow management. + * + * The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al. + */ +class WorkflowService(val scheduler: WorkflowScheduler) : ServiceProvider { + override val uid: UUID = UUID.randomUUID() + override val name: String = "workflows" + override val provides: Set<Service<*>> = setOf(WorkflowService) + override val dependencies: Set<Service<*>> = setOf(ProvisioningService) + + /** + * Build the runtime [Behavior] for the workflow service, responding to messages of shape [WorkflowMessage]. + */ + override fun invoke(zone: Zone, ref: ZoneRef): Behavior<WorkflowMessage> = suspending { ctx -> + val provisioner = ref.find(ProvisioningService) + // Wait for 0.1 sec before asking the provisioner to allow it to initialize. Will return empty response if asked + // immediately. + val lease: ProvisioningResponse.Lease = actorContext<ProvisioningResponse>().ask(provisioner, after = 0.1) { ProvisioningMessage.Request(Int.MAX_VALUE, it) } + + withTimers<Any> { timers -> + @Suppress("UNCHECKED_CAST") + val schedulerLogic = scheduler(ctx, timers as TimerScheduler<WorkflowMessage>, lease) + + receiveMessage { msg -> + when (msg) { + is WorkflowMessage.Submit -> { + schedulerLogic.submit(msg.job, msg.broker) + same() + } + is MachineEvent.Submitted -> { + schedulerLogic.onSubmission(msg.instance, msg.application, msg.key, msg.pid) + same() + } + is MachineEvent.Terminated -> { + schedulerLogic.onTermination(msg.instance, msg.pid, msg.status) + same() + } + else -> + unhandled() + } + } + }.narrow() + } + + companion object : AbstractService<WorkflowMessage>(UUID.randomUUID(), "workflows") +} + +/** + * A reference to the workflow service instance. + */ +typealias WorkflowServiceRef = ActorRef<WorkflowMessage> + +/** + * A message protocol for communicating to the workflow service. + */ +sealed class WorkflowMessage { + /** + * Submit the specified [Job] to the workflow service for scheduling. + * + * @property job The workflow to submit for scheduling. + * @property broker The broker that has submitted this workflow on behalf of a user and that needs to be kept + * up-to-date. + */ + data class Submit(val job: Job, val broker: ActorRef<WorkflowEvent>) : WorkflowMessage() +} + +/** + * A message protocol used by the workflow service to respond to [WorkflowMessage]s. + */ +sealed class WorkflowEvent { + /** + * Indicate that the specified [Job] was submitted to the workflow service. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that has been submitted. + * @property time A timestamp of the moment the job was received. + */ + data class JobSubmitted( + val service: WorkflowServiceRef, + val job: Job, + val time: Instant + ) : WorkflowEvent() + + /** + * Indicate that the specified [Job] has become active. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that has been submitted. + * @property time A timestamp of the moment the job started. + */ + data class JobStarted( + val service: WorkflowServiceRef, + val job: Job, + val time: Instant + ) : WorkflowEvent() + + /** + * Indicate that the specified [Task] has started processing. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that contains this task. + * @property task The task that has started processing. + * @property time A timestamp of the moment the task started. + */ + data class TaskStarted( + val service: WorkflowServiceRef, + val job: Job, + val task: Task, + val time: Instant + ) : WorkflowEvent() + + /** + * Indicate that the specified [Task] has started processing. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that contains this task. + * @property task The task that has started processing. + * @property status The exit code of the task, where zero means successful. + * @property time A timestamp of the moment the task finished. + */ + data class TaskFinished( + val service: WorkflowServiceRef, + val job: Job, + val task: Task, + val status: Int, + val time: Instant + ) : WorkflowEvent() + + /** + * Indicate that the specified [Job] has finished processing. + * + * @property service The reference to the service the job was submitted to. + * @property job The job that has finished processing. + * @property time A timestamp of the moment the task finished. + */ + data class JobFinished( + val service: WorkflowServiceRef, + val job: Job, + val time: Instant + ) : WorkflowEvent() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt new file mode 100644 index 00000000..c58d2210 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.job + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue. + */ +class FifoJobSortingPolicy : JobSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + jobs: Collection<StageWorkflowSchedulerLogic.JobView> + ): List<StageWorkflowSchedulerLogic.JobView> = jobs.toList() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt new file mode 100644 index 00000000..be60fa9b --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.job + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A policy interface for admitting [StageWorkflowSchedulerLogic.JobView]s to a scheduling cycle. + */ +interface JobAdmissionPolicy { + /** + * A method that is invoked at the start of each scheduling cycle. + * + * @param scheduler The scheduler that started the cycle. + */ + fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + + /** + * Determine whether the specified [StageWorkflowSchedulerLogic.JobView] should be admitted to the scheduling cycle. + * + * @param scheduler The scheduler that should admit or reject the job. + * @param job The workflow that has been submitted. + * @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise. + */ + fun shouldAdmit(scheduler: StageWorkflowSchedulerLogic, job: StageWorkflowSchedulerLogic.JobView): Boolean +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt new file mode 100644 index 00000000..3af88aa7 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.job + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A policy interface for ordering admitted workflows in the scheduling queue. + */ +interface JobSortingPolicy { + /** + * Sort the given collection of jobs on a given criterion. + * + * @param scheduler The scheduler that started the cycle. + * @param jobs The collection of tasks that should be sorted. + * @return The sorted list of jobs. + */ + operator fun invoke( + scheduler: StageWorkflowSchedulerLogic, + jobs: Collection<StageWorkflowSchedulerLogic.JobView> + ): List<StageWorkflowSchedulerLogic.JobView> +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt new file mode 100644 index 00000000..5436a1a1 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.job + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A [JobAdmissionPolicy] that admits all jobs. + */ +object NullJobAdmissionPolicy : JobAdmissionPolicy { + /** + * Admit every submitted job. + */ + override fun shouldAdmit( + scheduler: StageWorkflowSchedulerLogic, + job: StageWorkflowSchedulerLogic.JobView + ): Boolean = true +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt new file mode 100644 index 00000000..7da59692 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.job + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic +import kotlin.random.Random + +/** + * The [RandomJobSortingPolicy] sorts tasks randomly. + * + * @property random The [Random] instance to use when sorting the list of tasks. + */ +class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + jobs: Collection<StageWorkflowSchedulerLogic.JobView> + ): List<StageWorkflowSchedulerLogic.JobView> = jobs.shuffled(random) +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt new file mode 100644 index 00000000..afaf075d --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.resources + +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A [ResourceSelectionPolicy] that selects the first machine that is available. + */ +class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { + override fun select( + scheduler: StageWorkflowSchedulerLogic, + machines: List<HostView>, + task: StageWorkflowSchedulerLogic.TaskView + ): HostView? = + machines.firstOrNull() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt new file mode 100644 index 00000000..3f28a040 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.resources + +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for + * the task. + */ +class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + machines: List<HostView>, + task: StageWorkflowSchedulerLogic.TaskView + ): List<HostView> { + return machines + .filter { scheduler.machineCores[it] ?: 0 >= task.task.application.cores } + } +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt new file mode 100644 index 00000000..f73c0d9e --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.resources + +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding + * a list of resources with sufficient resource-capacities, based on fixed or dynamic requirements, and on predicted or + * monitored information about processing unit availability, memory occupancy, etc. + */ +interface ResourceDynamicFilterPolicy { + /** + * Filter the list of machines based on dynamic information. + * + * @param scheduler The scheduler to filter the machines. + * @param machines The list of machines in the system. + * @param task The task that is to be scheduled. + * @return The machines on which the task can be scheduled. + */ + operator fun invoke( + scheduler: StageWorkflowSchedulerLogic, + machines: List<HostView>, + task: StageWorkflowSchedulerLogic.TaskView + ): List<HostView> +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt new file mode 100644 index 00000000..a9172a53 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.resources + +import com.atlarge.opendc.model.services.resources.HostView +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected + * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit. + */ +interface ResourceSelectionPolicy { + /** + * Select a machine on which the task should be scheduled. + * + * @param scheduler The scheduler to select the machine. + * @param machines The list of machines in the system. + * @param task The task that is to be scheduled. + * @return The selected machine or `null` if no machine could be found. + */ + fun select( + scheduler: StageWorkflowSchedulerLogic, + machines: List<HostView>, + task: StageWorkflowSchedulerLogic.TaskView + ): HostView? +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt new file mode 100644 index 00000000..2eb2f6fb --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.task + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue. + */ +class FifoTaskSortingPolicy : TaskSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + tasks: Collection<StageWorkflowSchedulerLogic.TaskView> + ): List<StageWorkflowSchedulerLogic.TaskView> = tasks.toList() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt new file mode 100644 index 00000000..2e7cc8c1 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.task + +import com.atlarge.opendc.model.resources.compute.scheduling.ProcessState +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job. + */ +class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy { + override fun isEligible( + scheduler: StageWorkflowSchedulerLogic, + task: StageWorkflowSchedulerLogic.TaskView + ): Boolean = task.state == ProcessState.READY +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt new file mode 100644 index 00000000..69462e41 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.task + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic +import kotlin.random.Random + +/** + * The [RandomTaskSortingPolicy] sorts tasks randomly. + * + * @property random The [Random] instance to use when sorting the list of tasks. + */ +class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy { + override fun invoke( + scheduler: StageWorkflowSchedulerLogic, + tasks: Collection<StageWorkflowSchedulerLogic.TaskView> + ): List<StageWorkflowSchedulerLogic.TaskView> = tasks.shuffled(random) +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt new file mode 100644 index 00000000..c3c7e725 --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.task + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * A policy interface for determining the eligibility of tasks in a scheduling cycle. + */ +interface TaskEligibilityPolicy { + /** + * A method that is invoked at the start of each scheduling cycle. + * + * @param scheduler The scheduler that started the cycle. + */ + fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + + /** + * Determine whether the specified [StageWorkflowSchedulerLogic.TaskView] is eligible to be scheduled. + * + * @param scheduler The scheduler that is determining whether the task is eligible. + * @param task The task instance to schedule. + * @return `true` if the task eligible to be scheduled, `false` otherwise. + */ + fun isEligible(scheduler: StageWorkflowSchedulerLogic, task: StageWorkflowSchedulerLogic.TaskView): Boolean +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt new file mode 100644 index 00000000..3f296d0e --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.services.workflows.stages.task + +import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic + +/** + * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the + * scheduler with a sorted list of tasks to schedule. + */ +interface TaskSortingPolicy { + /** + * Sort the given list of tasks on a given criterion. + * + * @param scheduler The scheduler that is sorting the tasks. + * @param tasks The collection of tasks that should be sorted. + * @return The sorted list of tasks. + */ + operator fun invoke( + scheduler: StageWorkflowSchedulerLogic, + tasks: Collection<StageWorkflowSchedulerLogic.TaskView> + ): List<StageWorkflowSchedulerLogic.TaskView> +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt new file mode 100644 index 00000000..dd72cf6d --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.workflow + +import com.atlarge.opendc.model.User +import com.atlarge.opendc.model.workload.Workload +import java.util.UUID + +/** + * A workload that represents a directed acyclic graph (DAG) of tasks with control and data dependencies between tasks. + * + * @property uid A unique identified of this workflow. + * @property name The name of this workflow. + * @property owner The owner of the workflow. + * @property tasks The tasks that are part of this workflow. + */ +data class Job( + override val uid: UUID, + override val name: String, + override val owner: User, + val tasks: Set<Task> +) : Workload { + override fun equals(other: Any?): Boolean = other is Job && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt new file mode 100644 index 00000000..0cc3fa0e --- /dev/null +++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.model.workload.workflow + +import com.atlarge.opendc.model.Identity +import com.atlarge.opendc.model.workload.application.Application +import java.util.UUID + +/** + * A stage of a [Job]. + * + * @property uid A unique identified of this task. + * @property name The name of this task. + * @property application The application to run as part of this workflow task. + * @property dependencies The dependencies of this task in order for it to execute. + */ +data class Task( + override val uid: UUID, + override val name: String, + val application: Application, + val dependencies: Set<Task> +) : Identity { + override fun equals(other: Any?): Boolean = other is Task && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/settings.gradle.kts b/settings.gradle.kts index a9b2ea83..085885ba 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -27,3 +27,9 @@ include(":odcsim-core") include(":odcsim-engine-tests") include(":odcsim-engine-omega") include(":odcsim-testkit") +include(":opendc-core") +include(":opendc-experiments-tpds") +include(":opendc-format") +include(":opendc-format-gwf") +include(":opendc-format-sc18") +include(":opendc-workflows") |
