diff options
Diffstat (limited to 'opendc')
59 files changed, 304 insertions, 2491 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index dad24ebe..cb14835a 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.compute.core import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.core.Identity import java.util.UUID diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/FakeBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/FakeBareMetalDriver.kt index 1b3782f6..7a6d9123 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/FakeBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/FakeBareMetalDriver.kt @@ -29,7 +29,6 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerManagementContext -import com.atlarge.opendc.compute.core.execution.serialize import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor @@ -71,7 +70,7 @@ public class FakeBareMetalDriver( val previousPowerState = node.powerState val server = when (node.powerState to powerState) { PowerState.POWER_OFF to PowerState.POWER_OFF -> null - PowerState.POWER_OFF to PowerState.POWER_ON -> Server(node.uid, node.name, flavor, node.image, ServerState.BUILD) + PowerState.POWER_OFF to PowerState.POWER_ON -> Server(UUID.randomUUID(), node.name, flavor, node.image, ServerState.BUILD) PowerState.POWER_ON to PowerState.POWER_OFF -> null // TODO Terminate existing image PowerState.POWER_ON to PowerState.POWER_ON -> node.server else -> throw IllegalStateException() @@ -110,6 +109,8 @@ public class FakeBareMetalDriver( } private val serverCtx = object : ServerManagementContext { + private var initialized: Boolean = false + override var server: Server get() = node.server!! set(value) { @@ -117,9 +118,15 @@ public class FakeBareMetalDriver( } override suspend fun init() { + if (initialized) { + throw IllegalStateException() + } + val previousState = server.state server = server.copy(state = ServerState.ACTIVE) monitor.onUpdate(server, previousState) + + initialized = true } override suspend fun exit(cause: Throwable?) { @@ -127,6 +134,7 @@ public class FakeBareMetalDriver( val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR server = server.copy(state = state) monitor.onUpdate(server, previousState) + initialized = false } override suspend fun run(req: LongArray) { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt index ba2ebc80..24ade799 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt @@ -28,6 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver +import com.atlarge.opendc.core.services.AbstractServiceKey +import java.util.UUID /** * A cloud platform service for provisioning bare-metal compute nodes on the platform. @@ -52,4 +54,9 @@ public interface ProvisioningService { * Deploy the specified [Image] on a compute node. */ public suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node + + /** + * The service key of this service. + */ + companion object Key : AbstractServiceKey<ProvisioningService>(UUID.randomUUID(), "provisioner") } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index dc860405..6b5c0979 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -62,6 +62,7 @@ public class SimpleProvisioningService : ProvisioningService, ServerMonitor { val driver = nodes[node]!! driver.setImage(image) + driver.setPower(PowerState.POWER_OFF) val newNode = driver.setPower(PowerState.POWER_ON) monitors[newNode.server!!] = monitor return newNode diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index 0bbd4b75..9c4a4e2a 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -31,14 +31,12 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.PowerState import com.atlarge.opendc.compute.metal.driver.FakeBareMetalDriver +import java.util.ServiceLoader +import java.util.UUID import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test -import java.util.ServiceLoader -import java.util.UUID - /** * Test suite for the [SimpleProvisioningService]. @@ -60,7 +58,6 @@ internal class SimpleProvisioningServiceTest { } val driver = FakeBareMetalDriver(UUID.randomUUID(), "test", flavor) - val provisioner = SimpleProvisioningService() provisioner.create(driver) delay(5) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Cluster.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Cluster.kt deleted file mode 100644 index da9aed00..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Cluster.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core - -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.opendc.core.resources.compute.MachineMessage -import com.atlarge.opendc.core.resources.compute.host.Host -import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent -import java.util.UUID - -/** - * A logical grouping of heterogeneous hosts and primary storage within a zone. - * - * @property uid The unique identifier of the cluster. - * @property name The name of this cluster. - * @property hosts The machines in this cluster. - */ -data class Cluster(override val uid: UUID, override val name: String, val hosts: List<Host>) : Identity { - /** - * Build the runtime [Behavior] of this cluster of hosts. - * - * @param manager The manager of the cluster. - */ - suspend operator fun invoke(ctx: ProcessContext, manager: SendRef<MachineSupervisionEvent>) { - // Launch all hosts in the cluster - for (host in hosts) { - val channel = ctx.open<MachineMessage>() - ctx.spawn({ host(it, manager, channel) }, name = host.name) - } - } -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Model.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Model.kt deleted file mode 100644 index 3d16c4b2..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Model.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core - -import com.atlarge.odcsim.ProcessContext - -/** - * A simulation model for large-scale simulation of datacenter infrastructure, built with the *odcsim* API. - * - * @property environment The environment in which brokers operate. - * @property brokers The brokers acting on the cloud platforms. - */ -data class Model(val environment: Environment, val brokers: List<Broker>) { - /** - * Build the runtime behavior of the universe. - */ - suspend operator fun invoke(ctx: ProcessContext) { - // Setup the environment - val platforms = environment.platforms.map { platform -> - val channel = ctx.open<PlatformMessage>() - ctx.spawn({ platform(it, channel.receive) }, name = platform.name) - channel.send - } - - for (broker in brokers) { - ctx.spawn { broker(it, platforms) } - } - } -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt index fab67962..c0424c20 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Platform.kt @@ -24,15 +24,7 @@ package com.atlarge.opendc.core -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.ReceiveRef -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.ask -import com.atlarge.odcsim.sendOnce import java.util.UUID -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.isActive /** * A representation of a cloud platform such as Amazon Web Services (AWS), Microsoft Azure or Google Cloud. @@ -41,63 +33,4 @@ import kotlinx.coroutines.isActive * @property name the name of the platform. * @property zones The availability zones available on this platform. */ -data class Platform(override val uid: UUID, override val name: String, val zones: List<Zone>) : Identity { - /** - * Build the runtime [Behavior] of this cloud platform. - */ - suspend operator fun invoke(ctx: ProcessContext, main: ReceiveRef<PlatformMessage>) { - println("Starting cloud platform $name [$uid] with ${zones.size} zones") - - // Launch all zones of the cloud platform - val zoneInstances = zones.associateWith { zone -> - val channel = ctx.open<ZoneMessage>() - ctx.spawn({ zone(it, channel) }, name = zone.name) - channel.send - } - - val inlet = ctx.listen(main) - coroutineScope { - while (isActive) { - when (val msg = inlet.receive()) { - is PlatformMessage.ListZones -> { - msg.replyTo.sendOnce(PlatformResponse.Zones(this@Platform, zoneInstances.mapKeys { it.key.name })) - } - } - } - } - inlet.close() - } -} - -/** - * A message protocol for communicating with a cloud platform. - */ -sealed class PlatformMessage { - /** - * Request the available zones on this platform. - * - * @property replyTo The actor address to send the response to. - */ - data class ListZones(val replyTo: SendRef<PlatformResponse.Zones>) : PlatformMessage() -} - -/** - * A message protocol used by platform actors to respond to [PlatformMessage]s. - */ -sealed class PlatformResponse { - /** - * The zones available on this cloud platform. - * - * @property platform The reference to the cloud platform these are the zones of. - * @property zones The zones in this cloud platform. - */ - data class Zones(val platform: Platform, val zones: Map<String, SendRef<ZoneMessage>>) : PlatformResponse() -} - -/** - * Retrieve the available zones of a platform. - */ -suspend fun SendRef<PlatformMessage>.zones(): Map<String, SendRef<ZoneMessage>> { - val zones: PlatformResponse.Zones = ask { PlatformMessage.ListZones(it) } - return zones.zones -} +data class Platform(override val uid: UUID, override val name: String, val zones: List<Zone>) : Identity diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt index 6c3ffd02..5f5d946b 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Zone.kt @@ -24,18 +24,8 @@ package com.atlarge.opendc.core -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.ask -import com.atlarge.odcsim.sendOnce -import com.atlarge.opendc.core.services.Service -import com.atlarge.opendc.core.services.ServiceProvider -import java.util.ArrayDeque +import com.atlarge.opendc.core.services.ServiceRegistry import java.util.UUID -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.isActive /** * An isolated location within a datacenter region from which public cloud services operate, roughly equivalent to a @@ -46,149 +36,13 @@ import kotlinx.coroutines.isActive * * @property uid The unique identifier of this availability zone. * @property name The name of the zone within its platform. - * @property services The initial set of services provided by the zone. - * @property clusters The clusters of machines in this zone. + * @property services The service registry containing the services of the zone. */ data class Zone( override val uid: UUID, override val name: String, - val services: Set<ServiceProvider>, - val clusters: List<Cluster> + val services: ServiceRegistry ) : Identity { - /** - * Build the runtime [Behavior] of this datacenter. - */ - suspend operator fun invoke(ctx: ProcessContext, main: Channel<ZoneMessage>) { - println("Starting zone $name [$uid]") - - // Launch all services of the zone - val instances: MutableMap<Service<*>, SendRef<*>> = mutableMapOf() - validateDependencies(services) - - for (provider in services) { - val channel = ctx.open<Any>() - println("Spawning service ${provider.name}") - ctx.spawn({ provider(it, this, main.send, channel) }, name = "${provider.name}-${provider.uid}") - provider.provides.forEach { instances[it] = channel.send } - } - - val inlet = ctx.listen(main.receive) - - coroutineScope { - while (isActive) { - when (val msg = inlet.receive()) { - is ZoneMessage.Find -> { - msg.replyTo.sendOnce(ZoneResponse.Listing(this@Zone, msg.key, instances[msg.key])) - } - } - } - } - } - - /** - * Validate the service for unsatisfiable dependencies. - */ - private fun validateDependencies(providers: Set<ServiceProvider>) { - val providersByKey = HashMap<Service<*>, ServiceProvider>() - for (provider in providers) { - if (provider.provides.isEmpty()) { - throw IllegalArgumentException(("Service provider $provider does not provide any service.")) - } - for (key in provider.provides) { - if (key in providersByKey) { - throw IllegalArgumentException("Multiple providers for service $key") - } - providersByKey[key] = provider - } - } - - val visited = HashSet<ServiceProvider>() - val queue = ArrayDeque(providers) - while (queue.isNotEmpty()) { - val service = queue.poll() - visited.add(service) - - for (dependencyKey in service.dependencies) { - val dependency = providersByKey[dependencyKey] - ?: throw IllegalArgumentException("Dependency $dependencyKey not satisfied for service $service") - if (dependency !in visited) { - queue.add(dependency) - } - } - } - } - override fun equals(other: Any?): Boolean = other is Zone && uid == other.uid override fun hashCode(): Int = uid.hashCode() } - -/** - * A message protocol for communicating with the service registry. - */ -sealed class ZoneMessage { - /** - * Lookup the specified service in this availability zone. - * - * @property key The key of the service to lookup. - * @property replyTo The address to reply to. - */ - data class Find( - val key: Service<*>, - val replyTo: SendRef<ZoneResponse.Listing> - ) : ZoneMessage() -} - -/** - * A message protocol used by service registry actors to respond to [ZoneMessage]s. - */ -sealed class ZoneResponse { - /** - * The response sent when looking up services in a zone. - * - * @property zone The zone from which the response originates. - * @property key The key of the service that was looked up. - * @property ref The reference to the service or `null` if it is not present in the zone. - */ - data class Listing( - val zone: Zone, - val key: Service<*>, - private val ref: SendRef<*>? - ) : ZoneResponse() { - /** - * A flag to indicate whether the service is present. - */ - val isPresent: Boolean - get() = ref != null - - /** - * Determine whether this listing is for the specified key. - * - * @param key The key to check for. - * @return `true` if the listing is for this key, `false` otherwise. - */ - fun isForKey(key: Service<*>): Boolean = key == this.key - - /** - * Extract the result from the service lookup. - * - * @param key The key of the lookup. - * @return The reference to the service or `null` if it is not present in the zone. - */ - operator fun <T : Any> invoke(key: Service<T>): SendRef<T>? { - require(this.key == key) { "Invalid key" } - @Suppress("UNCHECKED_CAST") - return ref as? SendRef<T> - } - } -} - -/** - * Find the reference to the specified [ServiceProvider]. - * - * @param key The key of the service to find. - * @throws IllegalArgumentException if the service is not found. - */ -suspend fun <T : Any> SendRef<ZoneMessage>.find(key: Service<T>): SendRef<T> { - val listing: ZoneResponse.Listing = ask { ZoneMessage.Find(key, it) } - return listing(key) ?: throw IllegalArgumentException("Unknown key $key") -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/Machine.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/Machine.kt deleted file mode 100644 index f25fa3cc..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/Machine.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute - -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.opendc.core.Identity -import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent -import com.atlarge.opendc.core.workload.application.Application -import com.atlarge.opendc.core.workload.application.Pid - -/** - * A generic representation of a compute node (either physical or virtual) that is able to run [Application]s. - */ -interface Machine : Identity { - /** - * The details of the machine in key/value pairs. - */ - val details: Map<String, Any> - - /** - * Build the runtime [Behavior] of this compute resource, accepting messages of [MachineMessage]. - * - * @param supervisor The supervisor of the machine. - */ - suspend operator fun invoke(ctx: ProcessContext, supervisor: SendRef<MachineSupervisionEvent>, main: Channel<MachineMessage>) -} - -/** - * A reference to a machine instance that accepts messages of type [MachineMessage]. - */ -typealias MachineRef = SendRef<MachineMessage> - -/** - * A message protocol for communicating with machine instances. - */ -sealed class MachineMessage { - /** - * Launch the specified [Application] on the machine instance. - * - * @property application The application to submit. - * @property key The key to identify this submission. - * @property broker The broker of the process to spawn. - */ - data class Submit( - val application: Application, - val key: Any, - val broker: SendRef<MachineEvent> - ) : MachineMessage() -} - -/** - * A message protocol used by machine instances to respond to [MachineMessage]s. - */ -sealed class MachineEvent { - /** - * Indicate that an [Application] was spawned on a machine instance. - * - * @property instance The machine instance to which the application was submitted. - * @property application The application that has been submitted. - * @property key The key used to identify the submission. - * @property pid The spawned application instance. - */ - data class Submitted( - val instance: MachineRef, - val application: Application, - val key: Any, - val pid: Pid - ) : MachineEvent() - - /** - * Indicate that an [Application] has terminated on the specified machine. - * - * @property instance The machine instance to which the application was submitted. - * @property pid The reference to the application instance that has terminated. - * @property status The exit code of the task, where zero means successful. - */ - data class Terminated( - val instance: MachineRef, - val pid: Pid, - val status: Int = 0 - ) : MachineEvent() -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingElement.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingElement.kt deleted file mode 100644 index 23a5b444..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingElement.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute - -/** - * A logical core in a CPU. - * - * @property id The identifier of the core within the CPU. - * @property unit The [ProcessingUnit] the core is part of. - */ -data class ProcessingElement(val id: Int, val unit: ProcessingUnit) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingUnit.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingUnit.kt deleted file mode 100644 index 76985f64..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/ProcessingUnit.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute - -/** - * A processing unit of a compute resource, either virtual or physical. - * - * @property vendor The vendor string of the cpu. - * @property family The cpu family number. - * @property model The model number of the cpu. - * @property modelName The name of the cpu model. - * @property clockRate The clock speed of the cpu in MHz. - * @property cores The number of logical cores in the cpu. - */ -data class ProcessingUnit( - val vendor: String, - val family: Int, - val model: Int, - val modelName: String, - val clockRate: Double, - val cores: Int -) diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/host/Host.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/host/Host.kt deleted file mode 100644 index 21217468..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/host/Host.kt +++ /dev/null @@ -1,83 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute.host - -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.sendOnce -import com.atlarge.opendc.core.resources.compute.Machine -import com.atlarge.opendc.core.resources.compute.MachineMessage -import com.atlarge.opendc.core.resources.compute.ProcessingElement -import com.atlarge.opendc.core.resources.compute.scheduling.MachineScheduler -import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent -import java.util.UUID -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.isActive - -/** - * A physical compute node in a datacenter that is able to run [Application]s. - * - * @property uid The unique identifier of this machine. - * @property name The name of the machine. - * @property scheduler The process scheduler of this machine. - * @property cores The list of processing elements in the machine. - * @property details The details of this host. - */ -data class Host( - override val uid: UUID, - override val name: String, - val scheduler: MachineScheduler, - val cores: List<ProcessingElement>, - override val details: Map<String, Any> = emptyMap() -) : Machine { - /** - * Build the [Behavior] for a physical machine. - */ - override suspend fun invoke(ctx: ProcessContext, supervisor: SendRef<MachineSupervisionEvent>, main: Channel<MachineMessage>) { - coroutineScope { - supervisor.sendOnce(MachineSupervisionEvent.Announce(this@Host, main.send)) - supervisor.sendOnce(MachineSupervisionEvent.Up(main.send)) - - val sched = scheduler(ctx, this, this@Host, main.send) - sched.updateResources(cores) - - val inlet = ctx.listen(main.receive) - while (isActive) { - when (val msg = inlet.receive()) { - is MachineMessage.Submit -> { - sched.submit(msg.application, msg.key, msg.broker) - } - } - } - inlet.close() - } - } - - override fun equals(other: Any?): Boolean = other is Machine && uid == other.uid - - override fun hashCode(): Int = uid.hashCode() -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineScheduler.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineScheduler.kt deleted file mode 100644 index 400c6a0f..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineScheduler.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute.scheduling - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.opendc.core.resources.compute.Machine -import com.atlarge.opendc.core.resources.compute.MachineRef -import kotlinx.coroutines.CoroutineScope - -/** - * A factory interface for constructing a [MachineSchedulerLogic]. - */ -interface MachineScheduler { - /** - * Construct a [MachineSchedulerLogic] in the given [ProcessContext]. - * - * @param machine The machine to create the scheduler for. - */ - operator fun invoke(ctx: ProcessContext, coroutineScope: CoroutineScope, machine: Machine, machineRef: MachineRef): MachineSchedulerLogic -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineSchedulerLogic.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineSchedulerLogic.kt deleted file mode 100644 index 9bc20eb8..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/MachineSchedulerLogic.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute.scheduling - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.opendc.core.resources.compute.Machine -import com.atlarge.opendc.core.resources.compute.MachineEvent -import com.atlarge.opendc.core.resources.compute.MachineRef -import com.atlarge.opendc.core.resources.compute.ProcessingElement -import com.atlarge.opendc.core.workload.application.Application -import com.atlarge.opendc.core.workload.application.ProcessSupervisor -import kotlinx.coroutines.CoroutineScope - -/** - * A scheduler that distributes processes over processing elements in a machine. - * - * @property ctx The context in which the scheduler runs. - * @property machine The machine to create the scheduler for. - */ -abstract class MachineSchedulerLogic( - protected val ctx: ProcessContext, - protected val coroutineScope: CoroutineScope, - protected val machine: Machine, - protected val machineRef: MachineRef -) : ProcessSupervisor { - /** - * Update the available resources in the machine. - * - * @param cores The available processing cores for the scheduler. - */ - abstract suspend fun updateResources(cores: List<ProcessingElement>) - - /** - * Submit the specified application for scheduling. - * - * @param application The application to submit. - * @param key The key to identify the application instance. - * @param handler The broker of this application. - */ - abstract suspend fun submit(application: Application, key: Any, handler: SendRef<MachineEvent>) -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessObserver.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessObserver.kt deleted file mode 100644 index 2cfeec06..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessObserver.kt +++ /dev/null @@ -1,77 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute.scheduling - -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.ReceiveRef -import com.atlarge.opendc.core.resources.compute.MachineEvent -import com.atlarge.opendc.core.resources.compute.MachineRef -import com.atlarge.opendc.core.workload.application.Application -import com.atlarge.opendc.core.workload.application.Pid -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.isActive - -/** - * An interface for observing processes. - */ -interface ProcessObserver { - /** - * This method is invoked when the setup of an application completed successfully. - * - * @param pid The process id of the process that has been initialized. - */ - fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) - - /** - * This method is invoked when a process exits. - * - * @property pid A reference to the application instance. - * @property status The exit code of the task, where zero means successful. - */ - fun onTermination(instance: MachineRef, pid: Pid, status: Int) - - companion object { - /** - * Create the [Behavior] for a [ProcessObserver]. - * - * @param observer The observer to create the behavior for. - */ - suspend operator fun invoke(ctx: ProcessContext, observer: ProcessObserver, main: ReceiveRef<Any>) { - val inlet = ctx.listen(main) - - coroutineScope { - while (isActive) { - when (val msg = inlet.receive()) { - is MachineEvent.Submitted -> observer.onSubmission(msg.instance, msg.application, msg.key, msg.pid) - is MachineEvent.Terminated -> observer.onTermination(msg.instance, msg.pid, msg.status) - } - } - } - - inlet.close() - } - } -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessView.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessView.kt deleted file mode 100644 index daf71af4..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessView.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute.scheduling - -import com.atlarge.odcsim.SendRef -import com.atlarge.opendc.core.resources.compute.MachineEvent -import com.atlarge.opendc.core.workload.application.Application -import com.atlarge.opendc.core.workload.application.Pid -import com.atlarge.opendc.core.workload.application.ProcessMessage - -/** - * A process represents a application instance running on a particular machine from the machine scheduler's point of - * view. - * - * @property application The application this is an instance of. - * @property broker The broker of the process, which is informed about its progress. - * @property pid The reference to the application instance. - * @property state The state of the process. - */ -data class ProcessView( - val application: Application, - val broker: SendRef<MachineEvent>, - val pid: Pid, - var state: ProcessState = ProcessState.CREATED -) { - /** - * The slice of processing elements allocated for the process. Available as soon as the state - * becomes [ProcessState.RUNNING] - */ - lateinit var allocation: ProcessMessage.Allocation -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/SpaceSharedMachineScheduler.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/SpaceSharedMachineScheduler.kt deleted file mode 100644 index 6cdc3ea5..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/SpaceSharedMachineScheduler.kt +++ /dev/null @@ -1,189 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute.scheduling - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.sendOnce -import com.atlarge.opendc.core.resources.compute.Machine -import com.atlarge.opendc.core.resources.compute.MachineEvent -import com.atlarge.opendc.core.resources.compute.MachineRef -import com.atlarge.opendc.core.resources.compute.ProcessingElement -import com.atlarge.opendc.core.workload.application.Application -import com.atlarge.opendc.core.workload.application.Pid -import com.atlarge.opendc.core.workload.application.ProcessEvent -import com.atlarge.opendc.core.workload.application.ProcessMessage -import com.atlarge.opendc.core.workload.application.ProcessSupervisor -import java.util.ArrayDeque -import java.util.UUID -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch - -/** - * A machine scheduling policy where processes are space-shared on the machine. - * - * Space-sharing for machine scheduling means that all running processes will be allocated a separate set of the - * [ProcessingElement]s in a [Machine]. Applications are scheduled on the machine in first-in-first-out (FIFO) order, - * thus larger applications may block smaller tasks from proceeding, while space is available (no backfilling). - * - * @property ctx The context in which the scheduler runs. - * @property machine The machine to create the scheduler for. - */ -class SpaceSharedMachineScheduler( - ctx: ProcessContext, - coroutineScope: CoroutineScope, - machine: Machine, - machineRef: MachineRef -) : MachineSchedulerLogic(ctx, coroutineScope, machine, machineRef), ProcessSupervisor { - private var cores = 0 - private val available = ArrayDeque<ProcessingElement>() - private val queue = ArrayDeque<SendRef<ProcessMessage>>() - private val running = LinkedHashSet<SendRef<ProcessMessage>>() - private val processes = HashMap<SendRef<ProcessMessage>, ProcessView>() - private val jobs = HashMap<SendRef<ProcessMessage>, Job>() - private val channel = ctx.open<ProcessEvent>() - - init { - coroutineScope.launch { - ProcessSupervisor(ctx, this@SpaceSharedMachineScheduler, channel.receive) - } - } - - override suspend fun updateResources(cores: List<ProcessingElement>) { - available.addAll(cores) - this.cores = cores.size - - // Add all running tasks in front of the queue - running.reversed().forEach { queue.addFirst(it) } - running.clear() - - reschedule() - } - - override suspend fun submit(application: Application, key: Any, handler: SendRef<MachineEvent>) { - val channel = ctx.open<ProcessMessage>() - val pid = channel.send - // Create application instance on the machine - ctx.spawn({ application(it, pid, channel.receive) }, name = application.name + ":" + application.uid + ":" + UUID.randomUUID().toString()) - processes[pid] = ProcessView(application, handler, pid) - - // Inform the owner that the task has been submitted - handler.sendOnce(MachineEvent.Submitted(machineRef, application, key, pid)) - - // Setup the task - pid.sendOnce(ProcessMessage.Setup(machine, this@SpaceSharedMachineScheduler.channel.send)) - } - - /** - * Reschedule the tasks on this machine. - */ - private fun reschedule() { - while (queue.isNotEmpty()) { - val pid = queue.peek() - val process = processes[pid]!! - - if (process.application.cores >= cores) { - // The task will never fit on the machine - // TODO Fail task - println("Process $process will not fit in machine: dropping.") - queue.remove() - return - } else if (process.application.cores > available.size) { - // The task will not fit at the moment - // Try again if resources become available - // ctx.log.debug("Application queued: not enough processing elements available [requested={}, available={}]", - // process.application.cores, available.size) - return - } - queue.remove() - - // Compute the available resources - val resources = List(process.application.cores) { - val pe = available.poll() - Pair(pe, 1.0) - }.toMap() - process.state = ProcessState.RUNNING - process.allocation = ProcessMessage.Allocation(resources, Long.MAX_VALUE) - running += pid - - coroutineScope.launch(Dispatchers.Unconfined) { - pid.sendOnce(process.allocation) - } - } - } - - override fun onReady(pid: Pid) { - val process = processes[pid]!! - - // Schedule the task if it has been setup - queue.add(pid) - process.state = ProcessState.READY - - reschedule() - } - - override fun onConsume(pid: Pid, utilization: Map<ProcessingElement, Double>, until: Long) { - val process = processes[pid]!! - val allocation = process.allocation - - if (until > allocation.until) { - // Tasks are not allowed to extend allocation provided by the machine - // TODO Fail the task - println("Task $pid must not extend allocation provided by the machine") - } else if (until < allocation.until) { - // Shrink allocation - process.allocation = allocation.copy(until = until) - } - - // Reschedule the process after the allocation expires - jobs[pid] = coroutineScope.launch { - delay(process.allocation.until - ctx.clock.millis()) - // We just extend the allocation - process.allocation = process.allocation.copy(until = Long.MAX_VALUE) - pid.sendOnce(process.allocation) - } - } - - override fun onExit(pid: Pid, status: Int) { - val process = processes.remove(pid)!! - running -= pid - jobs[pid]?.cancel() - process.allocation.resources.keys.forEach { available.add(it) } - - // Inform the owner that the task has terminated - coroutineScope.launch { - process.broker.sendOnce(MachineEvent.Terminated(machineRef, pid, status)) - } - } - - companion object : MachineScheduler { - override fun invoke(ctx: ProcessContext, coroutineScope: CoroutineScope, machine: Machine, machineRef: MachineRef): MachineSchedulerLogic { - return SpaceSharedMachineScheduler(ctx, coroutineScope, machine, machineRef) - } - } -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisionEvent.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisionEvent.kt deleted file mode 100644 index 8a022112..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisionEvent.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute.supervision - -import com.atlarge.opendc.core.resources.compute.Machine -import com.atlarge.opendc.core.resources.compute.MachineRef - -/** - * A supervision protocol for [Machine] instances. - */ -sealed class MachineSupervisionEvent { - /** - * Initialization message to introduce to the supervisor a new machine by specifying its static information and - * address. - * - * @property machine The machine that is being announced. - * @property ref The address to talk to the host. - */ - data class Announce(val machine: Machine, val ref: MachineRef) : MachineSupervisionEvent() - - /** - * Indicate that the specified machine has booted up. - * - * @property ref The address to talk to the machine. - */ - data class Up(val ref: MachineRef) : MachineSupervisionEvent() -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisor.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisor.kt deleted file mode 100644 index 37cf9d44..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/supervision/MachineSupervisor.kt +++ /dev/null @@ -1,76 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.resources.compute.supervision - -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.ReceiveRef -import com.atlarge.opendc.core.resources.compute.Machine -import com.atlarge.opendc.core.resources.compute.MachineRef -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.isActive - -/** - * An interface for supervising [Machine] instances. - */ -interface MachineSupervisor { - /** - * This method is invoked when a new machine is introduced to the supervisor by specifying its static information - * and address. - * - * @param machine The machine that is being announced. - * @param ref The address to talk to the host. - */ - fun onAnnounce(machine: Machine, ref: MachineRef) - - /** - * This method is invoked when a process exits. - * - * @param ref The address to talk to the machine. - */ - fun onUp(ref: MachineRef) - - companion object { - /** - * Create the [Behavior] for a [MachineSupervisor]. - * - * @param supervisor The supervisor to create the behavior for. - */ - suspend operator fun invoke(ctx: ProcessContext, supervisor: MachineSupervisor, main: ReceiveRef<MachineSupervisionEvent>) { - val inlet = ctx.listen(main) - - coroutineScope { - while (isActive) { - when (val msg = inlet.receive()) { - is MachineSupervisionEvent.Announce -> supervisor.onAnnounce(msg.machine, msg.ref) - is MachineSupervisionEvent.Up -> supervisor.onUp(msg.ref) - } - } - } - - inlet.close() - } - } -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/Service.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceKey.kt index 8dcec760..290bf439 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/Service.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceKey.kt @@ -32,16 +32,16 @@ import java.util.UUID * * @param T The shape of the messages the service responds to. */ -interface Service<T : Any> : Identity +interface ServiceKey<T : Any> : Identity /** - * Helper class for constructing a [Service]. + * Helper class for constructing a [ServiceKey]. * * @property uid The unique identifier of the service. * @property name The name of the service. */ -abstract class AbstractService<T : Any>(override val uid: UUID, override val name: String) : Service<T> { - override fun equals(other: Any?): Boolean = other is Service<*> && uid == other.uid +abstract class AbstractServiceKey<T : Any>(override val uid: UUID, override val name: String) : ServiceKey<T> { + override fun equals(other: Any?): Boolean = other is ServiceKey<*> && uid == other.uid override fun hashCode(): Int = uid.hashCode() - override fun toString(): String = "Service[uid=$uid,name=$name]" + override fun toString(): String = "ServiceKey[uid=$uid, name=$name]" } diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceProvider.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceProvider.kt deleted file mode 100644 index 3592d578..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceProvider.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.services - -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.opendc.core.Identity -import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.ZoneMessage -import java.util.UUID - -/** - * An abstract representation of a cloud service implementation provided by a cloud platform. - */ -interface ServiceProvider : Identity { - /** - * The unique identifier of the service implementation. - */ - override val uid: UUID - - /** - * The name of the service implementation. - */ - override val name: String - - /** - * The set of services provided by this [ServiceProvider]. - */ - val provides: Set<Service<*>> - - /** - * The dependencies of the service implementation. - */ - val dependencies: Set<Service<*>> - - /** - * Build the runtime [Behavior] for this service. - * - * @param zone The zone model for which the service should be build. - * @param zoneRef The runtime reference to the zone's actor for communication. - * @param main The channel on which the service should listen. - */ - suspend operator fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef<ZoneMessage>, main: Channel<Any>) -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceMap.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt index 14cf4845..d9a85231 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceMap.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistry.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,26 +24,29 @@ package com.atlarge.opendc.core.services -import com.atlarge.odcsim.SendRef - /** - * A map containing services. + * A service registry for a datacenter zone. */ -interface ServiceMap { +public interface ServiceRegistry { /** - * Determine if this map contains the service with the specified [Service]. + * Determine if this map contains the service with the specified [ServiceKey]. * * @param key The key of the service to check for. * @return `true` if the service is in the map, `false` otherwise. */ - operator fun contains(key: Service<*>): Boolean + public operator fun contains(key: ServiceKey<*>): Boolean /** - * Obtain the service with the specified [Service]. + * Obtain the service with the specified [ServiceKey]. * * @param key The key of the service to obtain. * @return The references to the service. * @throws IllegalArgumentException if the key does not exists in the map. */ - operator fun <T : Any> get(key: Service<T>): SendRef<T> + public operator fun <T : Any> get(key: ServiceKey<T>): T + + /** + * Register the specified [ServiceKey] in this registry. + */ + public operator fun <T : Any> set(key: ServiceKey<T>, service: T) } diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Broker.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt index a3d6b0a7..91147839 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/Broker.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/ServiceRegistryImpl.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,20 +22,25 @@ * SOFTWARE. */ -package com.atlarge.opendc.core - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef +package com.atlarge.opendc.core.services /** - * A broker acting on the various cloud platforms on behalf of the user. + * Default implementation of the [ServiceRegistry] interface. */ -interface Broker { +public class ServiceRegistryImpl : ServiceRegistry { /** - * Build the runtime behavior of the [Broker]. - * - * @param platforms A list of available cloud platforms. - * @return The runtime behavior of the broker. + * The map containing the registered services. */ - suspend operator fun invoke(ctx: ProcessContext, platforms: List<SendRef<PlatformMessage>>) + private val services: MutableMap<ServiceKey<*>, Any> = mutableMapOf() + + override fun <T : Any> set(key: ServiceKey<T>, service: T) { + services[key] = service + } + + override fun contains(key: ServiceKey<*>): Boolean = key in services + + override fun <T : Any> get(key: ServiceKey<T>): T { + @Suppress("UNCHECKED_CAST") + return services[key] as T + } } diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/ProvisioningService.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/ProvisioningService.kt deleted file mode 100644 index 604e1942..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/ProvisioningService.kt +++ /dev/null @@ -1,76 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.services.provisioning - -import com.atlarge.odcsim.SendRef -import com.atlarge.opendc.core.services.AbstractService -import com.atlarge.opendc.core.services.Service -import com.atlarge.opendc.core.services.ServiceProvider -import com.atlarge.opendc.core.services.resources.HostView -import java.util.UUID - -/** - * A cloud platform service that provisions the native resources on the platform. - * - * This service assumes control over all hosts in its [Zone]. - */ -abstract class ProvisioningService : ServiceProvider { - override val provides: Set<Service<*>> = setOf(ProvisioningService) - - /** - * The service key of the provisioner service. - */ - companion object : AbstractService<ProvisioningMessage>(UUID.randomUUID(), "provisioner") -} - -/** - * A message protocol for communicating to the resource provisioner. - */ -sealed class ProvisioningMessage { - /** - * Request the specified number of resources from the provisioner. - * - * @property numHosts The number of hosts to request from the provisioner. - * @property replyTo The actor to reply to. - */ - data class Request(val numHosts: Int, val replyTo: SendRef<ProvisioningResponse.Lease>) : ProvisioningMessage() - - /** - * Release the specified resource [ProvisioningResponse.Lease]. - * - * @property lease The lease to release. - */ - data class Release(val lease: ProvisioningResponse.Lease) : ProvisioningMessage() -} - -/** - * A message protocol used by the resource provisioner to respond to [ProvisioningMessage]s. - */ -sealed class ProvisioningResponse { - /** - * A lease for the specified hosts. - */ - data class Lease(val hosts: List<HostView>) : ProvisioningResponse() -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/SimpleProvisioningService.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/SimpleProvisioningService.kt deleted file mode 100644 index 5f77e1a1..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/provisioning/SimpleProvisioningService.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.services.provisioning - -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.ask -import com.atlarge.odcsim.sendOnce -import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.ZoneMessage -import com.atlarge.opendc.core.find -import com.atlarge.opendc.core.resources.compute.MachineRef -import com.atlarge.opendc.core.services.Service -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.core.services.resources.ResourceManagementMessage -import com.atlarge.opendc.core.services.resources.ResourceManagementResponse -import com.atlarge.opendc.core.services.resources.ResourceManagementService -import java.util.ArrayDeque -import java.util.UUID -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive - -/** - * A cloud platform service that provisions the native resources on the platform. - * - * This service assumes control over all hosts in its [Zone]. - */ -object SimpleProvisioningService : ProvisioningService() { - override val uid: UUID = UUID.randomUUID() - override val name: String = "simple-provisioner" - override val dependencies: Set<Service<*>> = setOf(ResourceManagementService) - - /** - * Build the runtime [Behavior] for the resource provisioner, responding to messages of shape [ProvisioningMessage]. - */ - override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef<ZoneMessage>, main: Channel<Any>) { - val inlet = ctx.listen(main.receive) - val manager = zoneRef.find(ResourceManagementService) - - delay(10) - - val hosts = mutableMapOf<MachineRef, HostView>() - val available = ArrayDeque<HostView>() - val leases = mutableSetOf<ProvisioningResponse.Lease>() - - // Subscribe to all machines in the zone - for (cluster in zone.clusters) { - for (host in cluster.hosts) { - val msg: ResourceManagementResponse.Listing = manager.ask { ResourceManagementMessage.Lookup(host, it) } - if (msg.instance != null) { - hosts[msg.instance.ref] = msg.instance - available.add(msg.instance) - } - } - } - - coroutineScope { - while (isActive) { - when (val msg = inlet.receive()) { - is ProvisioningMessage.Request -> { - println("Provisioning ${msg.numHosts} hosts") - val leaseHosts = mutableListOf<HostView>() - while (available.isNotEmpty() && leaseHosts.size < msg.numHosts) { - leaseHosts += available.poll() - } - val lease = ProvisioningResponse.Lease(leaseHosts) - leases += lease - msg.replyTo.sendOnce(lease) - } - is ProvisioningMessage.Release -> { - val lease = msg.lease - if (lease in leases) { - return@coroutineScope - } - available.addAll(lease.hosts) - leases -= lease - } - } - } - } - } -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/HostView.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/HostView.kt deleted file mode 100644 index 60dd2eb9..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/HostView.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.services.resources - -import com.atlarge.opendc.core.resources.compute.MachineRef -import com.atlarge.opendc.core.resources.compute.MachineStatus -import com.atlarge.opendc.core.resources.compute.host.Host - -/** - * The dynamic information of a [Host] instance that is being tracked by the [ResourceManagementService]. This means - * that information may not be up-to-date. - * - * @property host The static information of the host. - * @property ref The reference to the host's actor. - * @property status The status of the machine. - */ -data class HostView(val host: Host, val ref: MachineRef, val status: MachineStatus = MachineStatus.HALT) { - override fun equals(other: Any?): Boolean = other is HostView && host.uid == other.host.uid - override fun hashCode(): Int = host.uid.hashCode() -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/ResourceManagementService.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/ResourceManagementService.kt deleted file mode 100644 index cc032952..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/services/resources/ResourceManagementService.kt +++ /dev/null @@ -1,120 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.services.resources - -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.sendOnce -import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.ZoneMessage -import com.atlarge.opendc.core.resources.compute.MachineRef -import com.atlarge.opendc.core.resources.compute.MachineStatus -import com.atlarge.opendc.core.resources.compute.host.Host -import com.atlarge.opendc.core.resources.compute.supervision.MachineSupervisionEvent -import com.atlarge.opendc.core.services.Service -import com.atlarge.opendc.core.services.ServiceProvider -import java.util.UUID -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.isActive - -/** - * A cloud platform service that manages the native resources on the platform. - * - * This service assumes control over all hosts in its [Zone]. - */ -object ResourceManagementService : ServiceProvider, Service<ResourceManagementMessage> { - override val uid: UUID = UUID.randomUUID() - override val name: String = "resource-manager" - override val provides: Set<Service<*>> = setOf(ResourceManagementService) - override val dependencies: Set<Service<*>> = emptySet() - - /** - * Build the runtime behavior of the [ResourceManagementService]. - */ - override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef<ZoneMessage>, main: Channel<Any>) { - // Launch the clusters of the zone - for (cluster in zone.clusters) { - ctx.spawn({ cluster(it, main.send) }, name = "${cluster.name}-${cluster.uid}") - } - - val hosts = mutableMapOf<MachineRef, HostView>() - val inlet = ctx.listen(main.receive) - - coroutineScope { - while (isActive) { - when (val msg = inlet.receive()) { - is MachineSupervisionEvent.Announce -> { - val host = msg.machine as? Host - if (host != null) { - hosts[msg.ref] = HostView(host, msg.ref) - } - } - is MachineSupervisionEvent.Up -> { - hosts.computeIfPresent(msg.ref) { _, value -> - value.copy(status = MachineStatus.RUNNING) - } - } - is ResourceManagementMessage.Lookup -> { - msg.replyTo.sendOnce(ResourceManagementResponse.Listing(hosts.values.find { it.host == msg.host })) - } - } - } - } - } -} - -/** - * A reference to the resource manager of a zone. - */ -typealias ResourceManagerRef = SendRef<ResourceManagementMessage> - -/** - * A message protocol for communicating to the resource manager. - */ -sealed class ResourceManagementMessage { - /** - * Lookup the specified [Host]. - * - * @property host The host to lookup. - * @property replyTo The address to sent the response to. - */ - data class Lookup( - val host: Host, - val replyTo: SendRef<ResourceManagementResponse.Listing> - ) : ResourceManagementMessage() -} - -/** - * A message protocol used by the resource manager to respond to [ResourceManagementMessage]s. - */ -sealed class ResourceManagementResponse { - /** - * A response to a [ResourceManagementMessage.Lookup] request. - * - * @property instance The instance that was found or `null` if it does not exist. - */ - data class Listing(val instance: HostView?) : ResourceManagementResponse() -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Application.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Application.kt deleted file mode 100644 index d1ccd347..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Application.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.workload.application - -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.ReceiveRef -import com.atlarge.opendc.core.workload.Workload - -/** - * A generic representation of a workload that can directly be executed by physical or virtual compute resources, - * such as a web server application. - */ -interface Application : Workload { - /** - * The number of processing elements required by the task. - */ - val cores: Int - - /** - * Build the runtime [Behavior] of an application, accepting messages of [ProcessMessage]. - * - * This is a model for the runtime behavior of an application instance (process) that describes how an application - * instance consumes the allocated resources on a machine. - */ - suspend operator fun invoke(ctx: ProcessContext, pid: Pid, main: ReceiveRef<ProcessMessage>) -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/FlopsApplication.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/FlopsApplication.kt deleted file mode 100644 index a2dbacf1..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/FlopsApplication.kt +++ /dev/null @@ -1,114 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.workload.application - -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.ReceiveRef -import com.atlarge.odcsim.sendOnce -import com.atlarge.opendc.core.User -import com.atlarge.opendc.core.resources.compute.ProcessingElement -import java.util.UUID -import kotlin.math.ceil -import kotlin.math.min -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.isActive - -/** - * An [Application] implementation that models applications performing a static number of floating point operations - * ([flops]) on a compute resource. - * - * @property uid A unique identifier for this application. - * @property name The name of the application. - * @property owner The owner of this application. - * @property cores The number of cores needed for this application. - * @property flops The number of floating point operations to perform for this task. - */ -class FlopsApplication( - override val uid: UUID, - override val name: String, - override val owner: User, - override val cores: Int, - val flops: Long -) : Application { - - init { - require(flops >= 0) { "Negative number of flops" } - } - - /** - * Build the runtime [Behavior] based on a number of floating point operations to execute. - */ - override suspend fun invoke(ctx: ProcessContext, pid: Pid, main: ReceiveRef<ProcessMessage>) { - val inlet = ctx.listen(main) - var remaining = flops - var start: Long = 0 - lateinit var allocation: Map<ProcessingElement, Double> - - val created = inlet.receive() as ProcessMessage.Setup - val ref = created.ref - - ref.sendOnce(ProcessEvent.Ready(pid)) - - suspend fun processAllocation(resources: Map<ProcessingElement, Double>, until: Long) { - start = ctx.clock.millis() - allocation = resources - .asSequence() - .take(cores) - .associateBy({ it.key }, { it.value }) - - val speed = allocation.asSequence() - .map { (key, value) -> key.unit.clockRate * value } - .average() - val finishedAt = ceil(ctx.clock.millis() + remaining / speed).toLong() - ref.sendOnce(ProcessEvent.Consume(pid, allocation, min(finishedAt, until))) - } - - var msg = inlet.receive() as ProcessMessage.Allocation - processAllocation(msg.resources, msg.until) - - coroutineScope { - while (isActive) { - msg = inlet.receive() as ProcessMessage.Allocation - - /* Compute the consumption of flops */ - val consumed = allocation.asSequence() - .map { (key, value) -> key.unit.clockRate * value * (ctx.clock.millis() - start) } - .sum() - // Ceil to prevent consumed flops being rounded to 0 - remaining -= ceil(consumed).toLong() - - /* Test whether all flops have been consumed and the task is finished */ - if (remaining <= 0) { - ref.sendOnce(ProcessEvent.Exit(pid, 0)) - break - } - processAllocation(msg.resources, msg.until) - } - } - - inlet.close() - } -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Process.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Process.kt deleted file mode 100644 index fc70b924..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/Process.kt +++ /dev/null @@ -1,90 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.workload.application - -import com.atlarge.odcsim.SendRef -import com.atlarge.opendc.core.resources.compute.Machine -import com.atlarge.opendc.core.resources.compute.ProcessingElement - -/** - * The process id (pid) is a reference to the application instance (process) that accepts messages of - * type [ProcessMessage]. - */ -typealias Pid = SendRef<ProcessMessage> - -/** - * A message protocol for actors to communicate with task instances (called processes). - */ -sealed class ProcessMessage { - /** - * Indicate that the task should be installed to the specified machine. - * - * @property machine The machine to install the task. - * @property ref The reference to the machine instance. - */ - data class Setup(val machine: Machine, val ref: SendRef<ProcessEvent>) : ProcessMessage() - - /** - * Indicate an allocation of compute resources on a machine for a certain duration. - * The task may assume that the reservation occurs after installation on the same machine. - * - * @property resources The cpu cores (and the utilization percentages) allocated for the task. - * @property until The point in time till which the reservation is valid. - */ - data class Allocation(val resources: Map<ProcessingElement, Double>, val until: Long) : ProcessMessage() -} - -/** - * The message protocol used by application instances respond to [ProcessMessage]s. - */ -sealed class ProcessEvent { - /** - * Indicate that the process is ready to start processing. - * - * @property pid A reference to the application instance. - */ - data class Ready(val pid: Pid) : ProcessEvent() - - /** - * Indicate the estimated resource utilization of the task until a specified point in time. - * - * @property pid A reference to the application instance of the represented utilization. - * @property utilization The utilization of the cpu cores as a percentage. - * @property until The point in time until which the utilization is valid. - */ - data class Consume( - val pid: Pid, - val utilization: Map<ProcessingElement, Double>, - val until: Long - ) : ProcessEvent() - - /** - * Indicate that a process has been terminated. - * - * @property pid A reference to the application instance. - * @property status The exit code of the task, where zero means successful. - */ - data class Exit(val pid: Pid, val status: Int) : ProcessEvent() -} diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/ProcessSupervisor.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/ProcessSupervisor.kt deleted file mode 100644 index fefd6c88..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/application/ProcessSupervisor.kt +++ /dev/null @@ -1,83 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.core.workload.application - -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.ReceiveRef -import com.atlarge.opendc.core.resources.compute.ProcessingElement -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.isActive - -/** - * An interface for supervising processes. - */ -interface ProcessSupervisor { - /** - * This method is invoked when the setup of an application completed successfully. - * - * @param pid The process id of the process that has been initialized. - */ - fun onReady(pid: Pid) {} - - /** - * This method is invoked when a process informs the machine that it is running with the - * estimated resource utilization until a specified point in time. - * - * @param pid The process id of the process that is running. - * @param utilization The utilization of the cpu cores as a percentage. - * @param until The point in time until which the utilization is valid. - */ - fun onConsume(pid: Pid, utilization: Map<ProcessingElement, Double>, until: Long) {} - - /** - * This method is invoked when a process exits. - * - * @property pid A reference to the application instance. - * @property status The exit code of the task, where zero means successful. - */ - fun onExit(pid: Pid, status: Int) {} - - companion object { - /** - * Create the [Behavior] for a [ProcessSupervisor]. - * - * @param supervisor The supervisor to create the behavior for. - */ - suspend operator fun invoke(ctx: ProcessContext, supervisor: ProcessSupervisor, main: ReceiveRef<ProcessEvent>) { - val inlet = ctx.listen(main) - coroutineScope { - while (isActive) { - when (val msg = inlet.receive()) { - is ProcessEvent.Ready -> supervisor.onReady(msg.pid) - is ProcessEvent.Consume -> supervisor.onConsume(msg.pid, msg.utilization, msg.until) - is ProcessEvent.Exit -> supervisor.onExit(msg.pid, msg.status) - } - } - } - inlet.close() - } - } -} diff --git a/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt b/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt index ffd1604e..3dcea99d 100644 --- a/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt +++ b/opendc/opendc-experiments-tpds/src/main/kotlin/com/atlarge/opendc/experiments/tpds/TestExperiment.kt @@ -24,23 +24,13 @@ package com.atlarge.opendc.experiments.tpds -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.opendc.core.Broker -import com.atlarge.opendc.core.Model -import com.atlarge.opendc.core.PlatformMessage -import com.atlarge.opendc.core.find -import com.atlarge.opendc.core.services.provisioning.SimpleProvisioningService -import com.atlarge.opendc.core.services.resources.ResourceManagementService -import com.atlarge.opendc.core.zones +import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.service.StageWorkflowScheduler -import com.atlarge.opendc.workflows.service.WorkflowEvent -import com.atlarge.opendc.workflows.service.WorkflowMessage +import com.atlarge.opendc.workflows.monitor.WorkflowMonitor +import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode -import com.atlarge.opendc.workflows.service.WorkflowService import com.atlarge.opendc.workflows.service.stage.job.FifoJobSortingPolicy import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy @@ -48,13 +38,12 @@ import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceDyn import com.atlarge.opendc.workflows.service.stage.task.FifoTaskSortingPolicy import com.atlarge.opendc.workflows.service.stage.task.FunctionalTaskEligibilityPolicy import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task import java.io.File import java.util.ServiceLoader import kotlin.math.max -import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking /** @@ -66,75 +55,62 @@ fun main(args: Array<String>) { return } - val scheduler = StageWorkflowScheduler( - mode = WorkflowSchedulerMode.Batch(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobSortingPolicy = FifoJobSortingPolicy(), - taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), - taskSortingPolicy = FifoTaskSortingPolicy(), - resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), - resourceSelectionPolicy = FirstFitResourceSelectionPolicy() - ) - val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) .use { it.read() } - .let { env -> - env.copy(platforms = env.platforms.map { platform -> - platform.copy(zones = platform.zones.map { zone -> - val services = zone.services + setOf(ResourceManagementService, SimpleProvisioningService, WorkflowService(scheduler)) - zone.copy(services = services) - }) - }) + + var total = 0 + var finished = 0 + + val token = Channel<Boolean>() + + val monitor = object : WorkflowMonitor { + override suspend fun onJobStart(job: Job, time: Long) { + println("Job ${job.uid} submitted") + total += 1 } - val broker = object : Broker { - override suspend fun invoke(ctx: ProcessContext, platforms: List<SendRef<PlatformMessage>>) { - coroutineScope { - val zones = platforms.first().zones() - val service = zones.values.first().find(WorkflowService) - val activeJobs = mutableSetOf<Job>() - val channel = ctx.open<WorkflowEvent>() - val outlet = ctx.connect(service) - val inlet = ctx.listen(channel.receive) - - launch { - val reader = GwfTraceReader(File(args[0])) - - while (reader.hasNext() && isActive) { - val (time, job) = reader.next() - delay(max(0, time - ctx.clock.millis())) - outlet.send(WorkflowMessage.Submit(job, channel.send)) - } - } - - var total = 0 - var finished = 0 - - while (isActive) { - when (val msg = inlet.receive()) { - is WorkflowEvent.JobSubmitted -> { - println("Job ${msg.job.uid} submitted") - total += 1 - } - is WorkflowEvent.JobStarted -> { - activeJobs += msg.job - } - is WorkflowEvent.JobFinished -> { - activeJobs -= msg.job - finished += 1 - println("Jobs $finished/$total finished (${msg.job.tasks.size} tasks)") - if (activeJobs.isEmpty()) - return@coroutineScope - } - } - } + override suspend fun onJobFinish(job: Job, time: Long) { + finished += 1 + println("Jobs $finished/$total finished (${job.tasks.size} tasks)") + + if (finished == total) { + token.send(true) } } + + override suspend fun onTaskStart(job: Job, task: Task, time: Long) { + println("Task started ${task.uid}") + } + + override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { + println("Task finished ${task.uid}") + } } - val model = Model(environment, listOf(broker)) - val factory = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = factory({ model(it) }, name = "sim") + val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + val system = provider({ ctx -> + val scheduler = StageWorkflowService( + ctx, + environment.platforms[0].zones[0].services[ProvisioningService.Key], + mode = WorkflowSchedulerMode.Batch(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobSortingPolicy = FifoJobSortingPolicy(), + taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), + taskSortingPolicy = FifoTaskSortingPolicy(), + resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), + resourceSelectionPolicy = FirstFitResourceSelectionPolicy() + ) + + val reader = GwfTraceReader(File(args[0])) + + while (reader.hasNext()) { + val (time, job) = reader.next() + delay(max(0, time - ctx.clock.millis())) + scheduler.submit(job, monitor) + } + + token.receive() + }, name = "sim") runBlocking { system.run() diff --git a/opendc/opendc-format/build.gradle.kts b/opendc/opendc-format/build.gradle.kts index 5f9ac1ec..21b0dc57 100644 --- a/opendc/opendc-format/build.gradle.kts +++ b/opendc/opendc-format/build.gradle.kts @@ -31,6 +31,7 @@ plugins { dependencies { api(project(":opendc:opendc-core")) + api(project(":opendc:opendc-compute")) api(project(":opendc:opendc-workflows")) api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") implementation(kotlin("stdlib")) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index ad111e74..7436778f 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -24,20 +24,22 @@ package com.atlarge.opendc.format.environment.sc18 -import com.atlarge.opendc.core.Cluster +import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.metal.driver.FakeBareMetalDriver +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService import com.atlarge.opendc.core.Environment import com.atlarge.opendc.core.Platform import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.resources.compute.ProcessingElement -import com.atlarge.opendc.core.resources.compute.ProcessingUnit -import com.atlarge.opendc.core.resources.compute.host.Host -import com.atlarge.opendc.core.resources.compute.scheduling.SpaceSharedMachineScheduler +import com.atlarge.opendc.core.services.ServiceRegistryImpl import com.atlarge.opendc.format.environment.EnvironmentReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import java.io.InputStream import java.util.UUID +import kotlinx.coroutines.runBlocking /** * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Datacenter @@ -54,29 +56,39 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb init { val setup = mapper.readValue<Setup>(input) - val clusters = setup.rooms.mapIndexed { i, room -> - var counter = 0 - val hosts = room.objects.flatMap { roomObject -> + var counter = 0 + val nodes = setup.rooms.flatMap { room -> + room.objects.flatMap { roomObject -> when (roomObject) { is RoomObject.Rack -> { roomObject.machines.map { machine -> - val cores = machine.cpus.flatMap { id -> + val cores = machine.cpus.map { id -> when (id) { - 1 -> List(4) { ProcessingElement(it, CPUS[0]) } - 2 -> List(2) { ProcessingElement(it, CPUS[1]) } + 1 -> ProcessingUnit("Intel", "Core(TM) i7-6920HQ", "amd64", 4100.0, 4) + 2 -> ProcessingUnit("Intel", "Core(TM) I7-6920HQ", "amd64", 3500.0, 2) else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - Host(UUID.randomUUID(), "node-${counter++}", SpaceSharedMachineScheduler, cores) + val flavor = Flavor(cores) + FakeBareMetalDriver(UUID.randomUUID(), "node-${counter++}", flavor) } } } } - Cluster(UUID.randomUUID(), "cluster-$i", hosts) } + val provisioningService = SimpleProvisioningService() + runBlocking { + for (node in nodes) { + provisioningService.create(node) + } + } + + val serviceRegistry = ServiceRegistryImpl() + serviceRegistry[ProvisioningService.Key] = provisioningService + val platform = Platform(UUID.randomUUID(), "sc18-platform", listOf( - Zone(UUID.randomUUID(), "zone", emptySet(), clusters) + Zone(UUID.randomUUID(), "zone", serviceRegistry) )) environment = Environment(setup.name, null, listOf(platform)) @@ -85,11 +97,4 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb override fun read(): Environment = environment override fun close() {} - - companion object { - val CPUS = arrayOf( - ProcessingUnit("Intel", 6, 6920, "Intel(R) Core(TM) i7-6920HQ CPU @ 4.10GHz", 4100.0, 1), - ProcessingUnit("Intel", 6, 6930, "Intel(R) Core(TM) i7-6920HQ CPU @ 3.50GHz", 3500.0, 1) - ) - } } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt index 407a5f4e..33db78c9 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.format.trace.gwf +import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.core.User -import com.atlarge.opendc.core.workload.application.FlopsApplication import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import com.atlarge.opendc.workflows.workload.Job @@ -120,7 +120,7 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { val workflow = entry.workload val task = Task( UUID(0L, taskId), "<unnamed>", - FlopsApplication(UUID(0L, taskId), "<unnamed>", workflow.owner, cores, flops), + FlopsApplicationImage(flops, cores), HashSet() ) entry.submissionTime = min(entry.submissionTime, submitTime) diff --git a/opendc/opendc-workflows/build.gradle.kts b/opendc/opendc-workflows/build.gradle.kts index 6aa044e8..08455368 100644 --- a/opendc/opendc-workflows/build.gradle.kts +++ b/opendc/opendc-workflows/build.gradle.kts @@ -31,6 +31,7 @@ plugins { dependencies { api(project(":opendc:opendc-core")) + api(project(":opendc:opendc-compute")) implementation(kotlin("stdlib")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt index e9e9a53e..3c77d57a 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/scheduling/ProcessState.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,29 +22,32 @@ * SOFTWARE. */ -package com.atlarge.opendc.core.resources.compute.scheduling +package com.atlarge.opendc.workflows.monitor + +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task /** - * An enumeration of the distinct states of an application instance (process). + * An interface for monitoring the progression of workflows. */ -enum class ProcessState { +public interface WorkflowMonitor { /** - * Default state of a process, where the task is waiting to be assigned and installed on a machine. + * This method is invoked when a job has become active. */ - CREATED, + public suspend fun onJobStart(job: Job, time: Long) /** - * State to indicate that the process is waiting to be ran. + * This method is invoked when a job has finished processing. */ - READY, + public suspend fun onJobFinish(job: Job, time: Long) /** - * State to indicate that the process is currently running. + * This method is invoked when a task of a job has started processing. */ - RUNNING, + public suspend fun onTaskStart(job: Job, task: Task, time: Long) /** - * State to indicate that the process has been terminated, either successfully or due to failure. + * This method is invoked when a task has finished processing. */ - TERMINATED, + public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt deleted file mode 100644 index d4240421..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy -import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy -import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy -import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy -import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy -import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy -import kotlinx.coroutines.CoroutineScope - -/** - * A [WorkflowScheduler] that distributes work through a multi-stage process based on the Reference Architecture for - * Datacenter Scheduling. - */ -class StageWorkflowScheduler( - private val mode: WorkflowSchedulerMode, - private val jobAdmissionPolicy: JobAdmissionPolicy, - private val jobSortingPolicy: JobSortingPolicy, - private val taskEligibilityPolicy: TaskEligibilityPolicy, - private val taskSortingPolicy: TaskSortingPolicy, - private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, - private val resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowScheduler { - override fun invoke( - ctx: ProcessContext, - self: WorkflowServiceRef, - coroutineScope: CoroutineScope, - lease: ProvisioningResponse.Lease - ): WorkflowSchedulerLogic { - return StageWorkflowSchedulerLogic(ctx, self, coroutineScope, lease, mode, jobAdmissionPolicy, - jobSortingPolicy, taskEligibilityPolicy, taskSortingPolicy, resourceDynamicFilterPolicy, resourceSelectionPolicy) - } -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index c6162f5e..d7b29c32 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -25,18 +25,12 @@ package com.atlarge.opendc.workflows.service import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.sendOnce -import com.atlarge.opendc.core.resources.compute.MachineEvent -import com.atlarge.opendc.core.resources.compute.MachineMessage -import com.atlarge.opendc.core.resources.compute.MachineRef -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessObserver -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessState -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.core.workload.application.Application -import com.atlarge.opendc.core.workload.application.Pid +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy @@ -45,18 +39,16 @@ import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch /** - * Logic of the [StageWorkflowScheduler]. + * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for + * Datacenter Scheduling. */ -class StageWorkflowSchedulerLogic( - ctx: ProcessContext, - self: WorkflowServiceRef, - coroutineScope: CoroutineScope, - lease: ProvisioningResponse.Lease, +class StageWorkflowService( + private val ctx: ProcessContext, + private val provisioningService: ProvisioningService, private val mode: WorkflowSchedulerMode, private val jobAdmissionPolicy: JobAdmissionPolicy, private val jobSortingPolicy: JobSortingPolicy, @@ -64,7 +56,7 @@ class StageWorkflowSchedulerLogic( private val taskSortingPolicy: TaskSortingPolicy, private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, private val resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowSchedulerLogic(ctx, self, coroutineScope, lease) { +) : WorkflowService, ServerMonitor { /** * The incoming jobs ready to be processed by the scheduler. @@ -77,30 +69,30 @@ class StageWorkflowSchedulerLogic( internal val activeJobs: MutableSet<JobView> = mutableSetOf() /** - * The running tasks by [Pid]. + * The running tasks by [Server]. */ - internal val taskByPid = mutableMapOf<Pid, TaskView>() + internal val taskByServer = mutableMapOf<Server, TaskView>() /** - * The available processor cores on the leased machines. + * The nodes that are controlled by the service. */ - internal val machineCores: MutableMap<HostView, Int> = HashMap() + internal lateinit var nodes: List<Node> - private val brokers: MutableMap<SendRef<WorkflowEvent>, SendPort<WorkflowEvent>> = HashMap() - private val channel = ctx.open<MachineEvent>() + /** + * The available nodes. + */ + internal val available: MutableSet<Node> = mutableSetOf() init { - lease.hosts.forEach { machineCores[it] = it.host.cores.count() } - coroutineScope.launch { - ProcessObserver(ctx, this@StageWorkflowSchedulerLogic, channel.receive) + ctx.launch { + nodes = provisioningService.nodes().toList() + available.addAll(nodes) } } - override suspend fun submit(job: Job, handler: SendRef<WorkflowEvent>) { - val broker = brokers.getOrPut(handler) { ctx.connect(handler) } - + override suspend fun submit(job: Job, monitor: WorkflowMonitor) { // J1 Incoming Jobs - val jobInstance = JobView(job, handler) + val jobInstance = JobView(job, monitor) val instances = job.tasks.associateWith { TaskView(jobInstance, it) } @@ -113,13 +105,12 @@ class StageWorkflowSchedulerLogic( // If the task has no dependency, it is a root task and can immediately be evaluated if (instance.isRoot) { - instance.state = ProcessState.READY + instance.state = TaskState.READY } } jobInstance.tasks = instances.values.toMutableSet() incomingJobs += jobInstance - broker.send(WorkflowEvent.JobSubmitted(self, job, ctx.clock.millis())) requestCycle() } @@ -131,20 +122,18 @@ class StageWorkflowSchedulerLogic( private fun requestCycle() { when (mode) { is WorkflowSchedulerMode.Interactive -> { - coroutineScope.launch { + ctx.launch { schedule() } } is WorkflowSchedulerMode.Batch -> { if (next == null) { - val job = coroutineScope.launch { + val job = ctx.launch { delay(mode.quantum) + next = null schedule() } next = job - job.invokeOnCompletion { - next = null - } } } } @@ -153,14 +142,15 @@ class StageWorkflowSchedulerLogic( /** * Perform a scheduling cycle immediately. */ - override suspend fun schedule() { + private suspend fun schedule() { // J2 Create list of eligible jobs jobAdmissionPolicy.startCycle(this) val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) } + for (jobInstance in eligibleJobs) { incomingJobs -= jobInstance activeJobs += jobInstance - brokers.getValue(jobInstance.broker).send(WorkflowEvent.JobStarted(self, jobInstance.job, ctx.clock.millis())) + jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis()) } // J3 Sort jobs on criterion @@ -177,15 +167,17 @@ class StageWorkflowSchedulerLogic( // T3 Per task for (instance in sortedTasks) { - val hosts = resourceDynamicFilterPolicy(this, lease.hosts, instance) + val hosts = resourceDynamicFilterPolicy(this, nodes, instance) val host = resourceSelectionPolicy.select(this, hosts, instance) if (host != null) { // T4 Submit task to machine - host.ref.sendOnce(MachineMessage.Submit(instance.task.application, instance, channel.send)) - instance.host = host - instance.state = ProcessState.RUNNING // Assume the application starts running - machineCores.merge(host, instance.task.application.cores, Int::minus) + available -= host + instance.state = TaskState.ACTIVE + + val newHost = provisioningService.deploy(host, instance.task.image, this) + instance.host = newHost + taskByServer[newHost.server!!] = instance } else { return } @@ -193,32 +185,32 @@ class StageWorkflowSchedulerLogic( } } - override fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) { - val task = key as TaskView - task.pid = pid - taskByPid[pid] = task - - brokers.getValue(task.job.broker).send(WorkflowEvent.TaskStarted(self, task.job.job, task.task, ctx.clock.millis())) - } - - override fun onTermination(instance: MachineRef, pid: Pid, status: Int) { - val task = taskByPid.remove(pid) ?: throw IllegalStateException() - - val job = task.job - task.state = ProcessState.TERMINATED - job.tasks.remove(task) - machineCores.merge(task.host!!, task.task.application.cores, Int::plus) - brokers.getValue(job.broker).send(WorkflowEvent.TaskFinished(self, job.job, task.task, status, ctx.clock.millis())) + override suspend fun onUpdate(server: Server, previousState: ServerState) { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis()) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskState.FINISHED + job.tasks.remove(task) + available += task.host!! + job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis()) + + if (job.isFinished) { + activeJobs -= job + job.monitor.onJobFinish(job.job, ctx.clock.millis()) + } - if (job.isFinished) { - activeJobs -= job - brokers.getValue(job.broker).send(WorkflowEvent.JobFinished(self, job.job, ctx.clock.millis())) + requestCycle() + } + else -> throw IllegalStateException() } - - requestCycle() } - class JobView(val job: Job, val broker: SendRef<WorkflowEvent>) { + class JobView(val job: Job, val monitor: WorkflowMonitor) { /** * A flag to indicate whether this job is finished. */ @@ -245,19 +237,17 @@ class StageWorkflowSchedulerLogic( val isRoot: Boolean get() = dependencies.isEmpty() - var state: ProcessState = ProcessState.CREATED + var state: TaskState = TaskState.CREATED set(value) { field = value // Mark the process as terminated in the graph - if (value == ProcessState.TERMINATED) { + if (value == TaskState.FINISHED) { markTerminated() } } - var pid: Pid? = null - - var host: HostView? = null + var host: Node? = null /** * Mark the specified [TaskView] as terminated. @@ -267,7 +257,7 @@ class StageWorkflowSchedulerLogic( dependent.dependencies.remove(this) if (dependent.isRoot) { - dependent.state = ProcessState.READY + dependent.state = TaskState.READY } } } diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/MachineStatus.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt index af039bcc..ee0024f5 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/resources/compute/MachineStatus.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,12 +22,14 @@ * SOFTWARE. */ -package com.atlarge.opendc.core.resources.compute +package com.atlarge.opendc.workflows.service /** - * The status of a machine. + * The state of a workflow task. */ -enum class MachineStatus { - HALT, - RUNNING +public enum class TaskState { + CREATED, + READY, + ACTIVE, + FINISHED } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt deleted file mode 100644 index 6d6d4179..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import kotlinx.coroutines.CoroutineScope - -/** - * A factory interface for constructing a [WorkflowSchedulerLogic]. - */ -interface WorkflowScheduler { - /** - * Construct a [WorkflowSchedulerLogic] in the given [ActorContext]. - * - * @param ctx The context in which the scheduler runs. - * @param timers The timer scheduler to use. - * @param lease The resource lease to use. - */ - operator fun invoke( - ctx: ProcessContext, - self: WorkflowServiceRef, - coroutineScope: CoroutineScope, - lease: ProvisioningResponse.Lease - ): WorkflowSchedulerLogic -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt deleted file mode 100644 index 0b3ba828..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessObserver -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.workflows.workload.Job -import kotlinx.coroutines.CoroutineScope - -/** - * A workflow scheduler interface that schedules jobs across machines. - * - * @property ctx The context in which the scheduler runs. - * @property timers The timer scheduler to use. - * @property lease The resource lease to use. - */ -abstract class WorkflowSchedulerLogic( - protected val ctx: ProcessContext, - protected val self: WorkflowServiceRef, - protected val coroutineScope: CoroutineScope, - protected val lease: ProvisioningResponse.Lease -) : ProcessObserver { - /** - * Submit the specified workflow for scheduling. - */ - abstract suspend fun submit(job: Job, handler: SendRef<WorkflowEvent>) - - /** - * Trigger an immediate scheduling cycle. - */ - abstract suspend fun schedule() -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index bed6b93b..524f4f9e 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -24,161 +24,24 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.ask -import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.ZoneMessage -import com.atlarge.opendc.core.find -import com.atlarge.opendc.core.resources.compute.MachineEvent -import com.atlarge.opendc.core.services.AbstractService -import com.atlarge.opendc.core.services.Service -import com.atlarge.opendc.core.services.ServiceProvider -import com.atlarge.opendc.core.services.provisioning.ProvisioningMessage -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.core.services.provisioning.ProvisioningService +import com.atlarge.opendc.core.services.AbstractServiceKey +import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import java.util.UUID -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive /** * A service for cloud workflow management. * * The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al. */ -class WorkflowService(val scheduler: WorkflowScheduler) : ServiceProvider { - override val uid: UUID = UUID.randomUUID() - override val name: String = "workflows" - override val provides: Set<Service<*>> = setOf(WorkflowService) - override val dependencies: Set<Service<*>> = setOf(ProvisioningService) - - /** - * Build the runtime [Behavior] for the workflow service, responding to messages of shape [WorkflowMessage]. - */ - override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef<ZoneMessage>, main: Channel<Any>) { - coroutineScope { - val inlet = ctx.listen(main.receive) - val provisioner = zoneRef.find(ProvisioningService) - // Wait for 0.1 sec before asking the provisioner to allow it to initialize. Will return empty response if asked - // immediately. - delay(10) - val lease: ProvisioningResponse.Lease = provisioner.ask { ProvisioningMessage.Request(Int.MAX_VALUE, it) } - val schedulerLogic = scheduler(ctx, main.send, this, lease) - - while (isActive) { - when (val msg = inlet.receive()) { - is WorkflowMessage.Submit -> { - schedulerLogic.submit(msg.job, msg.broker) - } - is MachineEvent.Submitted -> { - schedulerLogic.onSubmission(msg.instance, msg.application, msg.key, msg.pid) - } - is MachineEvent.Terminated -> { - schedulerLogic.onTermination(msg.instance, msg.pid, msg.status) - } - } - } - } - } - - companion object : AbstractService<WorkflowMessage>(UUID.randomUUID(), "workflows") -} - -/** - * A reference to the workflow service instance. - */ -typealias WorkflowServiceRef = SendRef<WorkflowMessage> - -/** - * A message protocol for communicating to the workflow service. - */ -sealed class WorkflowMessage { +public interface WorkflowService { /** * Submit the specified [Job] to the workflow service for scheduling. - * - * @property job The workflow to submit for scheduling. - * @property broker The broker that has submitted this workflow on behalf of a user and that needs to be kept - * up-to-date. - */ - data class Submit(val job: Job, val broker: SendRef<WorkflowEvent>) : WorkflowMessage() -} - -/** - * A message protocol used by the workflow service to respond to [WorkflowMessage]s. - */ -sealed class WorkflowEvent { - /** - * Indicate that the specified [Job] was submitted to the workflow service. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that has been submitted. - * @property time A timestamp of the moment the job was received. - */ - data class JobSubmitted( - val service: WorkflowServiceRef, - val job: Job, - val time: Long - ) : WorkflowEvent() - - /** - * Indicate that the specified [Job] has become active. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that has been submitted. - * @property time A timestamp of the moment the job started. - */ - data class JobStarted( - val service: WorkflowServiceRef, - val job: Job, - val time: Long - ) : WorkflowEvent() - - /** - * Indicate that the specified [Task] has started processing. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that contains this task. - * @property task The task that has started processing. - * @property time A timestamp of the moment the task started. - */ - data class TaskStarted( - val service: WorkflowServiceRef, - val job: Job, - val task: Task, - val time: Long - ) : WorkflowEvent() - - /** - * Indicate that the specified [Task] has started processing. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that contains this task. - * @property task The task that has started processing. - * @property status The exit code of the task, where zero means successful. - * @property time A timestamp of the moment the task finished. */ - data class TaskFinished( - val service: WorkflowServiceRef, - val job: Job, - val task: Task, - val status: Int, - val time: Long - ) : WorkflowEvent() + public suspend fun submit(job: Job, monitor: WorkflowMonitor) /** - * Indicate that the specified [Job] has finished processing. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that has finished processing. - * @property time A timestamp of the moment the task finished. + * The service key for the workflow scheduler. */ - data class JobFinished( - val service: WorkflowServiceRef, - val job: Job, - val time: Long - ) : WorkflowEvent() + companion object Key : AbstractServiceKey<WorkflowService>(UUID.randomUUID(), "workflows") } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt index 333ed35a..976fbbf3 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt @@ -24,14 +24,14 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue. */ class FifoJobSortingPolicy : JobSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - jobs: Collection<StageWorkflowSchedulerLogic.JobView> - ): List<StageWorkflowSchedulerLogic.JobView> = jobs.toList() + scheduler: StageWorkflowService, + jobs: Collection<StageWorkflowService.JobView> + ): List<StageWorkflowService.JobView> = jobs.toList() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt index d3a5d9a6..cdaad512 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt @@ -24,10 +24,10 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** - * A policy interface for admitting [StageWorkflowSchedulerLogic.JobView]s to a scheduling cycle. + * A policy interface for admitting [StageWorkflowService.JobView]s to a scheduling cycle. */ interface JobAdmissionPolicy { /** @@ -35,14 +35,14 @@ interface JobAdmissionPolicy { * * @param scheduler The scheduler that started the cycle. */ - fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + fun startCycle(scheduler: StageWorkflowService) {} /** - * Determine whether the specified [StageWorkflowSchedulerLogic.JobView] should be admitted to the scheduling cycle. + * Determine whether the specified [StageWorkflowService.JobView] should be admitted to the scheduling cycle. * * @param scheduler The scheduler that should admit or reject the job. * @param job The workflow that has been submitted. * @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise. */ - fun shouldAdmit(scheduler: StageWorkflowSchedulerLogic, job: StageWorkflowSchedulerLogic.JobView): Boolean + fun shouldAdmit(scheduler: StageWorkflowService, job: StageWorkflowService.JobView): Boolean } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt index ada3e693..c3a5dab5 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A policy interface for ordering admitted workflows in the scheduling queue. @@ -38,7 +38,7 @@ interface JobSortingPolicy { * @return The sorted list of jobs. */ operator fun invoke( - scheduler: StageWorkflowSchedulerLogic, - jobs: Collection<StageWorkflowSchedulerLogic.JobView> - ): List<StageWorkflowSchedulerLogic.JobView> + scheduler: StageWorkflowService, + jobs: Collection<StageWorkflowService.JobView> + ): List<StageWorkflowService.JobView> } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt index f877403b..ad90839c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [JobAdmissionPolicy] that admits all jobs. @@ -34,7 +34,7 @@ object NullJobAdmissionPolicy : JobAdmissionPolicy { * Admit every submitted job. */ override fun shouldAdmit( - scheduler: StageWorkflowSchedulerLogic, - job: StageWorkflowSchedulerLogic.JobView + scheduler: StageWorkflowService, + job: StageWorkflowService.JobView ): Boolean = true } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt index 30d5c456..9ce2811c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService import kotlin.random.Random /** @@ -34,7 +34,7 @@ import kotlin.random.Random */ class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - jobs: Collection<StageWorkflowSchedulerLogic.JobView> - ): List<StageWorkflowSchedulerLogic.JobView> = jobs.shuffled(random) + scheduler: StageWorkflowService, + jobs: Collection<StageWorkflowService.JobView> + ): List<StageWorkflowService.JobView> = jobs.shuffled(random) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt index c3307063..e2490214 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt @@ -24,17 +24,17 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [ResourceSelectionPolicy] that selects the first machine that is available. */ class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { override fun select( - scheduler: StageWorkflowSchedulerLogic, - machines: List<HostView>, - task: StageWorkflowSchedulerLogic.TaskView - ): HostView? = + scheduler: StageWorkflowService, + machines: List<Node>, + task: StageWorkflowService.TaskView + ): Node? = machines.firstOrNull() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt index d742f842..a8f2fda9 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for @@ -33,11 +33,11 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic */ class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - machines: List<HostView>, - task: StageWorkflowSchedulerLogic.TaskView - ): List<HostView> { + scheduler: StageWorkflowService, + machines: List<Node>, + task: StageWorkflowService.TaskView + ): List<Node> { return machines - .filter { scheduler.machineCores[it] ?: 0 >= task.task.application.cores } + .filter { it in scheduler.available } } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt index 8a3b5a1e..8d8ceec2 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding @@ -42,8 +42,8 @@ interface ResourceDynamicFilterPolicy { * @return The machines on which the task can be scheduled. */ operator fun invoke( - scheduler: StageWorkflowSchedulerLogic, - machines: List<HostView>, - task: StageWorkflowSchedulerLogic.TaskView - ): List<HostView> + scheduler: StageWorkflowService, + machines: List<Node>, + task: StageWorkflowService.TaskView + ): List<Node> } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt index 90b2873c..38fe5886 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected @@ -41,8 +41,8 @@ interface ResourceSelectionPolicy { * @return The selected machine or `null` if no machine could be found. */ fun select( - scheduler: StageWorkflowSchedulerLogic, - machines: List<HostView>, - task: StageWorkflowSchedulerLogic.TaskView - ): HostView? + scheduler: StageWorkflowService, + machines: List<Node>, + task: StageWorkflowService.TaskView + ): Node? } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt index 48a1a50d..bba81d27 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt @@ -24,14 +24,14 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue. */ class FifoTaskSortingPolicy : TaskSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - tasks: Collection<StageWorkflowSchedulerLogic.TaskView> - ): List<StageWorkflowSchedulerLogic.TaskView> = tasks.toList() + scheduler: StageWorkflowService, + tasks: Collection<StageWorkflowService.TaskView> + ): List<StageWorkflowService.TaskView> = tasks.toList() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt index 1672633e..72ecbee2 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt @@ -24,15 +24,15 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessState -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState /** * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job. */ class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy { override fun isEligible( - scheduler: StageWorkflowSchedulerLogic, - task: StageWorkflowSchedulerLogic.TaskView - ): Boolean = task.state == ProcessState.READY + scheduler: StageWorkflowService, + task: StageWorkflowService.TaskView + ): Boolean = task.state == TaskState.READY } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt index 36ef3a50..1b1d5b44 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService import kotlin.random.Random /** @@ -34,7 +34,7 @@ import kotlin.random.Random */ class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - tasks: Collection<StageWorkflowSchedulerLogic.TaskView> - ): List<StageWorkflowSchedulerLogic.TaskView> = tasks.shuffled(random) + scheduler: StageWorkflowService, + tasks: Collection<StageWorkflowService.TaskView> + ): List<StageWorkflowService.TaskView> = tasks.shuffled(random) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt index 19f0240b..19954d7b 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A policy interface for determining the eligibility of tasks in a scheduling cycle. @@ -35,14 +35,14 @@ interface TaskEligibilityPolicy { * * @param scheduler The scheduler that started the cycle. */ - fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + fun startCycle(scheduler: StageWorkflowService) {} /** - * Determine whether the specified [StageWorkflowSchedulerLogic.TaskView] is eligible to be scheduled. + * Determine whether the specified [StageWorkflowService.TaskView] is eligible to be scheduled. * * @param scheduler The scheduler that is determining whether the task is eligible. * @param task The task instance to schedule. * @return `true` if the task eligible to be scheduled, `false` otherwise. */ - fun isEligible(scheduler: StageWorkflowSchedulerLogic, task: StageWorkflowSchedulerLogic.TaskView): Boolean + fun isEligible(scheduler: StageWorkflowService, task: StageWorkflowService.TaskView): Boolean } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt index 6a65ed69..aabc44a9 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the @@ -39,7 +39,7 @@ interface TaskSortingPolicy { * @return The sorted list of tasks. */ operator fun invoke( - scheduler: StageWorkflowSchedulerLogic, - tasks: Collection<StageWorkflowSchedulerLogic.TaskView> - ): List<StageWorkflowSchedulerLogic.TaskView> + scheduler: StageWorkflowService, + tasks: Collection<StageWorkflowService.TaskView> + ): List<StageWorkflowService.TaskView> } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt index 25fe7348..b5997b35 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.workload +import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity -import com.atlarge.opendc.core.workload.application.Application import java.util.UUID /** @@ -33,13 +33,13 @@ import java.util.UUID * * @property uid A unique identified of this task. * @property name The name of this task. - * @property application The application to run as part of this workflow task. + * @property image The application image to run as part of this workflow task. * @property dependencies The dependencies of this task in order for it to execute. */ data class Task( override val uid: UUID, override val name: String, - val application: Application, + val image: Image, val dependencies: Set<Task> ) : Identity { override fun equals(other: Any?): Boolean = other is Task && uid == other.uid |
