diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-09 20:47:06 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-03-09 20:47:06 +0100 |
| commit | 3b6fbe0b535bf3398f120373f59f87adbba34005 (patch) | |
| tree | bc880252a935cc0b1558c50fe83f71d21b735d29 /simulator | |
| parent | 66c2501d95b167f9e7474a45e542f82d2d8e83ff (diff) | |
| parent | 40e5871e01858a55372bfcb51cf90069c080e751 (diff) | |
compute: Improvements to cloud compute model (v2)
This is the second in the series of pull requests to improve the existing cloud compute model (see #86). This pull request removes the dependency on the bare-metal provisioning code which simplifies experiment setup tremendously:
- Remove bare-metal provisioning code (opendc-metal)
- Remove opendc-core which was a relic of the previous codebase and was only used sparingly.
- Move ownership of Server, Image and Flavor to the compute service. Users are expected to create instances via the compute service.
Diffstat (limited to 'simulator')
119 files changed, 1534 insertions, 2456 deletions
diff --git a/simulator/README.md b/simulator/README.md index 61ef1d43..d02925d6 100644 --- a/simulator/README.md +++ b/simulator/README.md @@ -23,7 +23,7 @@ This component is responsible for modelling and simulation of datacenters and th - **[opendc-compute](opendc-compute)** The [Infrastructure as a Service](https://en.wikipedia.org/wiki/Infrastructure_as_a_Service) (IaaS) component of OpenDC for computing infrastructure (similar to [Amazon EC2](https://aws.amazon.com/ec2/) and [Google Compute Engine](https://cloud.google.com/compute)). -- **[opendc-workflows](opendc-workflows)** +- **[opendc-workflow](opendc-workflow)** Workflow orchestration service built on top of OpenDC. - **[opendc-format](opendc-format)** Collection of libraries for processing data formats related to (simulation of) cloud computing and datacenters. diff --git a/simulator/opendc-compute/opendc-compute-api/build.gradle.kts b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts index 10046322..835dbbb8 100644 --- a/simulator/opendc-compute/opendc-compute-api/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts @@ -29,5 +29,4 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) } diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt index 025513e6..baa1ba2f 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt @@ -22,21 +22,95 @@ package org.opendc.compute.api +import java.util.UUID + /** * A client interface for the OpenDC Compute service. */ public interface ComputeClient : AutoCloseable { /** + * Obtain the list of [Flavor]s accessible by the requesting user. + */ + public suspend fun queryFlavors(): List<Flavor> + + /** + * Obtain a [Flavor] by its unique identifier. + * + * @param id The identifier of the flavor. + */ + public suspend fun findFlavor(id: UUID): Flavor? + + /** + * Create a new [Flavor] instance at this compute service. + * + * @param name The name of the flavor. + * @param cpuCount The amount of CPU cores for this flavor. + * @param memorySize The size of the memory. + * @param labels The identifying labels of the image. + * @param meta The non-identifying meta-data of the image. + */ + public suspend fun newFlavor( + name: String, + cpuCount: Int, + memorySize: Long, + labels: Map<String, String> = emptyMap(), + meta: Map<String, Any> = emptyMap() + ): Flavor + + /** + * Obtain the list of [Image]s accessible by the requesting user. + */ + public suspend fun queryImages(): List<Image> + + /** + * Obtain a [Image] by its unique identifier. + * + * @param id The identifier of the image. + */ + public suspend fun findImage(id: UUID): Image? + + /** + * Create a new [Image] instance at this compute service. + * + * @param name The name of the image. + * @param labels The identifying labels of the image. + * @param meta The non-identifying meta-data of the image. + */ + public suspend fun newImage( + name: String, + labels: Map<String, String> = emptyMap(), + meta: Map<String, Any> = emptyMap() + ): Image + + /** + * Obtain the list of [Server]s accessible by the requesting user. + */ + public suspend fun queryServers(): List<Server> + + /** + * Obtain a [Server] by its unique identifier. + * + * @param id The identifier of the server. + */ + public suspend fun findServer(id: UUID): Server? + + /** * Create a new [Server] instance at this compute service. * * @param name The name of the server to deploy. * @param image The image to be deployed. * @param flavor The flavor of the machine instance to run this [image] on. + * @param labels The identifying labels of the server. + * @param meta The non-identifying meta-data of the server. + * @param start A flag to indicate that the server should be started immediately. */ public suspend fun newServer( name: String, image: Image, - flavor: Flavor + flavor: Flavor, + labels: Map<String, String> = emptyMap(), + meta: Map<String, Any> = emptyMap(), + start: Boolean = true ): Server /** diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt index bf5f0ce4..5f511f91 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt @@ -26,14 +26,19 @@ package org.opendc.compute.api * Flavors define the compute and memory capacity of [Server] instance. To put it simply, a flavor is an available * hardware configuration for a server. It defines the size of a virtual server that can be launched. */ -public data class Flavor( +public interface Flavor : Resource { /** * The number of (virtual) processing cores to use. */ - public val cpuCount: Int, + public val cpuCount: Int /** * The amount of RAM available to the server (in MB). */ public val memorySize: Long -) + + /** + * Delete the flavor instance. + */ + public suspend fun delete() +} diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt index 280c4d89..83e63b81 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt @@ -22,27 +22,12 @@ package org.opendc.compute.api -import org.opendc.core.resource.Resource -import org.opendc.core.resource.TagContainer -import java.util.* - /** * An image containing a bootable operating system that can directly be executed by physical or virtual server. - * - * OpenStack: A collection of files used to create or rebuild a server. Operators provide a number of pre-built OS - * images by default. You may also create custom images from cloud servers you have launched. These custom images are - * useful for backup purposes or for producing “gold” server images if you plan to deploy a particular server - * configuration frequently. */ -public data class Image( - public override val uid: UUID, - public override val name: String, - public override val tags: TagContainer -) : Resource { - public companion object { - /** - * An empty boot disk [Image] that exits immediately on start. - */ - public val EMPTY: Image = Image(UUID.randomUUID(), "empty", emptyMap()) - } +public interface Image : Resource { + /** + * Delete the image instance. + */ + public suspend fun delete() } diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt index ca98dab0..8fbb7308 100644 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,13 +20,10 @@ * SOFTWARE. */ -package org.opendc.metal - -/* - * Common metadata keys for bare-metal nodes. - */ +package org.opendc.compute.api /** - * The cluster to which the node belongs. + * This exception is thrown to indicate that the compute service does not have enough capacity at the moment to + * fulfill a launch request. */ -public const val NODE_CLUSTER: String = "bare-metal:cluster" +public class InsufficientServerCapacityException(override val cause: Throwable? = null) : Exception("There was insufficient capacity available to satisfy the launch request") diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Resource.kt index 64a47277..08120848 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Resource.kt @@ -22,25 +22,34 @@ package org.opendc.compute.api -import org.opendc.core.User -import org.opendc.core.workload.Workload import java.util.UUID /** - * A workload that represents a VM. - * - * @property uid A unique identified of this VM. - * @property name The name of this VM. - * @property owner The owner of the VM. - * @property image The image of the VM. + * A generic resource provided by the OpenDC Compute service. */ -public data class ComputeWorkload( - override val uid: UUID, - override val name: String, - override val owner: User, - val image: Image -) : Workload { - override fun equals(other: Any?): Boolean = other is ComputeWorkload && uid == other.uid +public interface Resource { + /** + * The unique identifier of the resource. + */ + public val uid: UUID + + /** + * The name of the resource. + */ + public val name: String + + /** + * The identifying labels attached to the resource. + */ + public val labels: Map<String, String> + + /** + * The non-identifying metadata attached to the resource. + */ + public val meta: Map<String, Any> - override fun hashCode(): Int = uid.hashCode() + /** + * Refresh the local state of the resource. + */ + public suspend fun refresh() } diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt index ab1eb860..b508a9f8 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt @@ -22,18 +22,11 @@ package org.opendc.compute.api -import org.opendc.core.resource.Resource - /** * A stateful object representing a server instance that is running on some physical or virtual machine. */ public interface Server : Resource { /** - * The name of the server. - */ - public override val name: String - - /** * The flavor of the server. */ public val flavor: Flavor @@ -44,14 +37,33 @@ public interface Server : Resource { public val image: Image /** - * The tags assigned to the server. + * The last known state of the server. */ - public override val tags: Map<String, String> + public val state: ServerState /** - * The last known state of the server. + * Request the server to be started. + * + * This method is guaranteed to return after the request was acknowledged, but might return before the server was + * started. */ - public val state: ServerState + public suspend fun start() + + /** + * Request the server to be stopped. + * + * This method is guaranteed to return after the request was acknowledged, but might return before the server was + * stopped. + */ + public suspend fun stop() + + /** + * Request the server to be deleted. + * + * This method is guaranteed to return after the request was acknowledged, but might return before the server was + * deleted. + */ + public suspend fun delete() /** * Register the specified [ServerWatcher] to watch the state of the server. @@ -66,9 +78,4 @@ public interface Server : Resource { * @param watcher The watcher to de-register from the server. */ public fun unwatch(watcher: ServerWatcher) - - /** - * Refresh the local state of the resource. - */ - public suspend fun refresh() } diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt index 25d2e519..a4d7d7d7 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt @@ -27,27 +27,27 @@ package org.opendc.compute.api */ public enum class ServerState { /** - * The server has not yet finished the original build process. + * Resources are being allocated for the instance. The instance is not running yet. */ - BUILD, + PROVISIONING, /** - * The server was powered down by the user. + * A user shut down the instance. */ - SHUTOFF, + TERMINATED, /** - * The server is active and running. + * The server instance is booting up or running. */ - ACTIVE, + RUNNING, /** - * The server is in error. + * The server is in an error state. */ ERROR, /** - * The state of the server is unknown. + * The server has been deleted and cannot be started later on. */ - UNKNOWN, + DELETED, } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index 2cd91144..c3c39572 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -51,6 +51,11 @@ public interface Host { public val state: HostState /** + * Meta-data associated with the host. + */ + public val meta: Map<String, Any> + + /** * The events emitted by the driver. */ public val events: Flow<HostEvent> @@ -74,7 +79,7 @@ public interface Host { public operator fun contains(server: Server): Boolean /** - * Stat the server [instance][server] if it is currently not running on this host. + * Start the server [instance][server] if it is currently not running on this host. * * @throws IllegalArgumentException if the server is not present on the host. */ @@ -88,9 +93,9 @@ public interface Host { public suspend fun stop(server: Server) /** - * Terminate the specified [instance][server] on this host and cleanup all resources associated with it. + * Delete the specified [instance][server] on this host and cleanup all resources associated with it. */ - public suspend fun terminate(server: Server) + public suspend fun delete(server: Server) /** * Add a [HostListener] to this host. diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt index 6548767e..29f10e27 100644 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt @@ -20,45 +20,43 @@ * SOFTWARE. */ -package org.opendc.metal.service +package org.opendc.compute.service.internal -import org.opendc.compute.api.Image -import org.opendc.core.services.AbstractServiceKey -import org.opendc.metal.Node -import org.opendc.metal.driver.BareMetalDriver +import org.opendc.compute.api.Flavor import java.util.UUID /** - * A cloud platform service for provisioning bare-metal compute nodes on the platform. + * A [Flavor] implementation that is passed to clients but delegates its implementation to another class. */ -public interface ProvisioningService { - /** - * Create a new bare-metal compute node. - */ - public suspend fun create(driver: BareMetalDriver): Node +internal class ClientFlavor(private val delegate: Flavor) : Flavor { + override val uid: UUID = delegate.uid - /** - * Obtain the available nodes. - */ - public suspend fun nodes(): Set<Node> + override var name: String = delegate.name + private set - /** - * Refresh the state of a compute node. - */ - public suspend fun refresh(node: Node): Node + override var cpuCount: Int = delegate.cpuCount + private set - /** - * Deploy the specified [Image] on a compute node. - */ - public suspend fun deploy(node: Node, image: Image): Node + override var memorySize: Long = delegate.memorySize + private set - /** - * Stop the specified [Node] . - */ - public suspend fun stop(node: Node): Node + override var labels: Map<String, String> = delegate.labels.toMap() + private set - /** - * The service key of this service. - */ - public companion object Key : AbstractServiceKey<ProvisioningService>(UUID.randomUUID(), "provisioner") + override var meta: Map<String, Any> = delegate.meta.toMap() + private set + + override suspend fun delete() { + delegate.delete() + } + + override suspend fun refresh() { + delegate.refresh() + + name = delegate.name + cpuCount = delegate.cpuCount + memorySize = delegate.memorySize + labels = delegate.labels + meta = delegate.meta + } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt index b24f80da..6c5b2ab0 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,31 +20,36 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.compute.service.internal -import kotlinx.coroutines.flow.Flow -import org.opendc.core.services.AbstractServiceKey -import org.opendc.workflows.workload.Job +import org.opendc.compute.api.Image import java.util.* /** - * A service for cloud workflow management. - * - * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al. + * An [Image] implementation that is passed to clients but delegates its implementation to another class. */ -public interface WorkflowService { - /** - * The events emitted by the workflow scheduler. - */ - public val events: Flow<WorkflowEvent> - - /** - * Submit the specified [Job] to the workflow service for scheduling. - */ - public suspend fun submit(job: Job) - - /** - * The service key for the workflow scheduler. - */ - public companion object Key : AbstractServiceKey<WorkflowService>(UUID.randomUUID(), "workflows") +internal class ClientImage(private val delegate: Image) : Image { + override val uid: UUID = delegate.uid + + override var name: String = delegate.name + private set + + override var labels: Map<String, String> = delegate.labels.toMap() + private set + + override var meta: Map<String, Any> = delegate.meta.toMap() + private set + + override suspend fun delete() { + delegate.delete() + refresh() + } + + override suspend fun refresh() { + delegate.refresh() + + name = delegate.name + labels = delegate.labels + meta = delegate.meta + } } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt index f84b7435..ae4cee3b 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -46,12 +46,30 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche override var image: Image = delegate.image private set - override var tags: Map<String, String> = delegate.tags.toMap() + override var labels: Map<String, String> = delegate.labels.toMap() + private set + + override var meta: Map<String, Any> = delegate.meta.toMap() private set override var state: ServerState = delegate.state private set + override suspend fun start() { + delegate.start() + refresh() + } + + override suspend fun stop() { + delegate.stop() + refresh() + } + + override suspend fun delete() { + delegate.delete() + refresh() + } + override fun watch(watcher: ServerWatcher) { if (watchers.isEmpty()) { delegate.watch(this) @@ -69,10 +87,13 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche } override suspend fun refresh() { + delegate.refresh() + name = delegate.name flavor = delegate.flavor image = delegate.image - tags = delegate.tags + labels = delegate.labels + meta = delegate.meta state = delegate.state } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 69d6bb59..2c38f7cb 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,11 +22,8 @@ package org.opendc.compute.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.cancel +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService @@ -41,9 +38,7 @@ import org.opendc.utils.TimerScheduler import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* -import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume import kotlin.math.max /** @@ -87,12 +82,27 @@ public class ComputeServiceImpl( /** * The servers that should be launched by the service. */ - private val queue: Deque<LaunchRequest> = ArrayDeque() + private val queue: Deque<SchedulingRequest> = ArrayDeque() /** * The active servers in the system. */ - private val activeServers: MutableSet<Server> = mutableSetOf() + private val activeServers: MutableMap<Server, Host> = mutableMapOf() + + /** + * The registered flavors for this compute service. + */ + internal val flavors = mutableMapOf<UUID, InternalFlavor>() + + /** + * The registered images for this compute service. + */ + internal val images = mutableMapOf<UUID, InternalImage>() + + /** + * The registered servers for this compute service. + */ + private val servers = mutableMapOf<UUID, InternalServer>() public var submittedVms: Int = 0 public var queuedVms: Int = 0 @@ -126,7 +136,74 @@ public class ComputeServiceImpl( override fun newClient(): ComputeClient = object : ComputeClient { private var isClosed: Boolean = false - override suspend fun newServer(name: String, image: Image, flavor: Flavor): Server { + override suspend fun queryFlavors(): List<Flavor> { + check(!isClosed) { "Client is already closed" } + + return flavors.values.map { ClientFlavor(it) } + } + + override suspend fun findFlavor(id: UUID): Flavor? { + check(!isClosed) { "Client is already closed" } + + return flavors[id]?.let { ClientFlavor(it) } + } + + override suspend fun newFlavor( + name: String, + cpuCount: Int, + memorySize: Long, + labels: Map<String, String>, + meta: Map<String, Any> + ): Flavor { + check(!isClosed) { "Client is already closed" } + + val uid = UUID(clock.millis(), random.nextLong()) + val flavor = InternalFlavor( + this@ComputeServiceImpl, + uid, + name, + cpuCount, + memorySize, + labels, + meta + ) + + flavors[uid] = flavor + + return ClientFlavor(flavor) + } + + override suspend fun queryImages(): List<Image> { + check(!isClosed) { "Client is already closed" } + + return images.values.map { ClientImage(it) } + } + + override suspend fun findImage(id: UUID): Image? { + check(!isClosed) { "Client is already closed" } + + return images[id]?.let { ClientImage(it) } + } + + override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image { + check(!isClosed) { "Client is already closed" } + + val uid = UUID(clock.millis(), random.nextLong()) + val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) + + images[uid] = image + + return ClientImage(image) + } + + override suspend fun newServer( + name: String, + image: Image, + flavor: Flavor, + labels: Map<String, String>, + meta: Map<String, Any>, + start: Boolean + ): Server { check(!isClosed) { "Client is closed" } tracer.commit(VmSubmissionEvent(name, image, flavor)) @@ -143,11 +220,36 @@ public class ComputeServiceImpl( ) ) - return suspendCancellableCoroutine { cont -> - val request = LaunchRequest(createServer(name, image, flavor), cont) - queue += request - requestCycle() + val uid = UUID(clock.millis(), random.nextLong()) + val server = InternalServer( + this@ComputeServiceImpl, + uid, + name, + flavor, + image, + labels.toMutableMap(), + meta.toMutableMap() + ) + + servers[uid] = server + + if (start) { + server.start() } + + return ClientServer(server) + } + + override suspend fun findServer(id: UUID): Server? { + check(!isClosed) { "Client is already closed" } + + return servers[id]?.let { ClientServer(it) } + } + + override suspend fun queryServers(): List<Server> { + check(!isClosed) { "Client is already closed" } + + return servers.values.map { ClientServer(it) } } override fun close() { @@ -183,22 +285,31 @@ public class ComputeServiceImpl( scope.cancel() } - private fun createServer( - name: String, - image: Image, - flavor: Flavor - ): Server { - return ServerImpl( - uid = UUID(random.nextLong(), random.nextLong()), - name = name, - flavor = flavor, - image = image - ) + internal fun schedule(server: InternalServer) { + logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } + + queue.add(SchedulingRequest(server)) + requestSchedulingCycle() + } + + internal fun delete(flavor: InternalFlavor) { + checkNotNull(flavors.remove(flavor.uid)) { "Flavor was not known" } } - private fun requestCycle() { - // Bail out in case we have already requested a new cycle. - if (scheduler.isTimerActive(Unit)) { + internal fun delete(image: InternalImage) { + checkNotNull(images.remove(image.uid)) { "Image was not known" } + } + + internal fun delete(server: InternalServer) { + checkNotNull(servers.remove(server.uid)) { "Server was not known" } + } + + /** + * Indicate that a new scheduling cycle is needed due to a change to the service's state. + */ + private fun requestSchedulingCycle() { + // Bail out in case we have already requested a new cycle or the queue is empty. + if (scheduler.isTimerActive(Unit) || queue.isEmpty()) { return } @@ -208,20 +319,28 @@ public class ComputeServiceImpl( val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) scheduler.startSingleTimer(Unit, delay) { - schedule() + doSchedule() } } - private fun schedule() { + /** + * Run a single scheduling iteration. + */ + private fun doSchedule() { while (queue.isNotEmpty()) { - val (server, cont) = queue.peekFirst() - val requiredMemory = server.flavor.memorySize - val selectedHv = allocationLogic.select(availableHosts, server) + val request = queue.peek() - if (selectedHv == null || !selectedHv.host.canFit(server)) { + if (request.isCancelled) { + queue.poll() + continue + } + + val server = request.server + val hv = allocationLogic.select(availableHosts, request.server) + if (hv == null || !hv.host.canFit(server)) { logger.trace { "Server $server selected for scheduling but no capacity available for it." } - if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) { + if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { tracer.commit(VmSubmissionInvalidEvent(server.name)) _events.emit( @@ -247,45 +366,62 @@ public class ComputeServiceImpl( } } - logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.host.uid} ${selectedHv.host.name} ${selectedHv.host.model}" } - queue.poll() - - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - selectedHv.numberOfActiveServers++ - selectedHv.provisionedCores += server.flavor.cpuCount - selectedHv.availableMemory -= requiredMemory // XXX Temporary hack + val host = hv.host - scope.launch { - try { - cont.resume(ClientServer(server)) - selectedHv.host.spawn(server) - activeServers += server + // Remove request from queue + queue.poll() - tracer.commit(VmScheduledEvent(server.name)) - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms + logger.info { "Assigned server $server to host $host." } + try { + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + hv.numberOfActiveServers++ + hv.provisionedCores += server.flavor.cpuCount + hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack + + scope.launch { + try { + server.assignHost(host) + host.spawn(server) + activeServers[server] = host + + tracer.commit(VmScheduledEvent(server.name)) + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) ) - ) - } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + } catch (e: Throwable) { + logger.error("Failed to deploy VM", e) - selectedHv.numberOfActiveServers-- - selectedHv.provisionedCores -= server.flavor.cpuCount - selectedHv.availableMemory += requiredMemory + hv.numberOfActiveServers-- + hv.provisionedCores -= server.flavor.cpuCount + hv.availableMemory += server.flavor.memorySize + } } + } catch (e: Exception) { + logger.warn(e) { "Failed to assign server $server to $host. " } } } } + /** + * A request to schedule an [InternalServer] onto one of the [Host]s. + */ + private data class SchedulingRequest(val server: InternalServer) { + /** + * A flag to indicate that the request is cancelled. + */ + var isCancelled: Boolean = false + } + override fun onStateChanged(host: Host, newState: HostState) { when (newState) { HostState.UP -> { @@ -313,9 +449,7 @@ public class ComputeServiceImpl( ) // Re-schedule on the new machine - if (queue.isNotEmpty()) { - requestCycle() - } + requestSchedulingCycle() } HostState.DOWN -> { logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } @@ -338,19 +472,23 @@ public class ComputeServiceImpl( ) ) - if (queue.isNotEmpty()) { - requestCycle() - } + requestSchedulingCycle() } } } override fun onStateChanged(host: Host, server: Server, newState: ServerState) { - val serverImpl = server as ServerImpl - serverImpl.state = newState - serverImpl.watchers.forEach { it.onStateChanged(server, newState) } + require(server is InternalServer) { "Invalid server type passed to service" } - if (newState == ServerState.SHUTOFF) { + if (server.host != host) { + // This can happen when a server is rescheduled and started on another machine, while being deleted from + // the old machine. + return + } + + server.state = newState + + if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } tracer.commit(VmStoppedEvent(server.name)) @@ -379,36 +517,7 @@ public class ComputeServiceImpl( } // Try to reschedule if needed - if (queue.isNotEmpty()) { - requestCycle() - } + requestSchedulingCycle() } } - - public data class LaunchRequest(val server: Server, val cont: Continuation<Server>) - - private class ServerImpl( - override val uid: UUID, - override val name: String, - override val flavor: Flavor, - override val image: Image - ) : Server { - val watchers = mutableListOf<ServerWatcher>() - - override fun watch(watcher: ServerWatcher) { - watchers += watcher - } - - override fun unwatch(watcher: ServerWatcher) { - watchers -= watcher - } - - override suspend fun refresh() { - // No-op: this object is the source-of-truth - } - - override val tags: Map<String, String> = emptyMap() - - override var state: ServerState = ServerState.BUILD - } } diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt index 1c5c7a8d..95e280df 100644 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt @@ -20,53 +20,45 @@ * SOFTWARE. */ -package org.opendc.metal +package org.opendc.compute.service.internal -import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.core.Identity -import java.util.UUID +import java.util.* /** - * A bare-metal compute node. + * Internal stateful representation of a [Flavor]. */ -public data class Node( - /** - * The unique identifier of the node. - */ - public override val uid: UUID, +internal class InternalFlavor( + private val service: ComputeServiceImpl, + override val uid: UUID, + name: String, + cpuCount: Int, + memorySize: Long, + labels: Map<String, String>, + meta: Map<String, Any> +) : Flavor { + override var name: String = name + private set - /** - * The optional name of the node. - */ - public override val name: String, + override var cpuCount: Int = cpuCount + private set - /** - * Metadata of the node. - */ - public val metadata: Map<String, Any>, + override var memorySize: Long = memorySize + private set - /** - * The last known state of the compute node. - */ - public val state: NodeState, + override val labels: MutableMap<String, String> = labels.toMutableMap() - /** - * The flavor of the node. - */ - public val flavor: Flavor, + override val meta: MutableMap<String, Any> = meta.toMutableMap() - /** - * The boot image of the node. - */ - public val image: Image, + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override suspend fun delete() { + service.delete(this) + } + + override fun equals(other: Any?): Boolean = other is InternalFlavor && uid == other.uid - /** - * The events that are emitted by the node. - */ - public val events: Flow<NodeEvent> -) : Identity { override fun hashCode(): Int = uid.hashCode() - override fun equals(other: Any?): Boolean = other is Node && uid == other.uid } diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceKey.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt index 9078ecdd..86f2f6b9 100644 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceKey.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,26 +20,35 @@ * SOFTWARE. */ -package org.opendc.core.services +package org.opendc.compute.service.internal -import org.opendc.core.Identity +import org.opendc.compute.api.Image import java.util.* /** - * An interface for identifying service implementations of the same type (providing the same service). - * - * @param T The shape of the messages the service responds to. + * Internal stateful representation of an [Image]. */ -public interface ServiceKey<T : Any> : Identity +internal class InternalImage( + private val service: ComputeServiceImpl, + override val uid: UUID, + override val name: String, + labels: Map<String, String>, + meta: Map<String, Any> +) : Image { + + override val labels: MutableMap<String, String> = labels.toMutableMap() + + override val meta: MutableMap<String, Any> = meta.toMutableMap() + + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override suspend fun delete() { + service.delete(this) + } + + override fun equals(other: Any?): Boolean = other is InternalImage && uid == other.uid -/** - * Helper class for constructing a [ServiceKey]. - * - * @property uid The unique identifier of the service. - * @property name The name of the service. - */ -public abstract class AbstractServiceKey<T : Any>(override val uid: UUID, override val name: String) : ServiceKey<T> { - override fun equals(other: Any?): Boolean = other is ServiceKey<*> && uid == other.uid override fun hashCode(): Int = uid.hashCode() - override fun toString(): String = "ServiceKey[uid=$uid, name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt new file mode 100644 index 00000000..ff7c1d15 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.internal + +import mu.KotlinLogging +import org.opendc.compute.api.* +import org.opendc.compute.service.driver.Host +import java.util.UUID + +/** + * Internal implementation of the [Server] interface. + */ +internal class InternalServer( + private val service: ComputeServiceImpl, + override val uid: UUID, + override val name: String, + override val flavor: Flavor, + override val image: Image, + override val labels: MutableMap<String, String>, + override val meta: MutableMap<String, Any> +) : Server { + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The watchers of this server object. + */ + private val watchers = mutableListOf<ServerWatcher>() + + /** + * The [Host] that has been assigned to host the server. + */ + internal var host: Host? = null + + override suspend fun start() { + when (state) { + ServerState.RUNNING -> { + logger.debug { "User tried to start server but server is already running" } + return + } + ServerState.PROVISIONING -> { + logger.debug { "User tried to start server but request is already pending: doing nothing" } + return + } + ServerState.DELETED -> { + logger.warn { "User tried to start terminated server" } + throw IllegalArgumentException("Server is terminated") + } + else -> { + logger.info { "User requested to start server $uid" } + state = ServerState.PROVISIONING + service.schedule(this) + } + } + } + + override suspend fun stop() { + when (state) { + ServerState.PROVISIONING -> {} // TODO Find way to interrupt these + ServerState.RUNNING, ServerState.ERROR -> { + val host = checkNotNull(host) { "Server not running" } + host.stop(this) + } + ServerState.TERMINATED -> {} // No work needed + ServerState.DELETED -> throw IllegalStateException("Server is terminated") + } + } + + override suspend fun delete() { + when (state) { + ServerState.PROVISIONING -> {} // TODO Find way to interrupt these + ServerState.RUNNING -> { + val host = checkNotNull(host) { "Server not running" } + host.delete(this) + service.delete(this) + } + else -> {} // No work needed + } + } + + override fun watch(watcher: ServerWatcher) { + watchers += watcher + } + + override fun unwatch(watcher: ServerWatcher) { + watchers -= watcher + } + + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override var state: ServerState = ServerState.TERMINATED + set(value) { + if (value != field) { + watchers.forEach { it.onStateChanged(this, value) } + } + + field = value + } + + internal fun assignHost(host: Host) { + this.host = host + } + + override fun equals(other: Any?): Boolean = other is InternalServer && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt index 3facb182..ac7b351d 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt @@ -38,7 +38,7 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al ): HostView? { return hypervisors.asIterable() .filter { hv -> - val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long) + val fitsMemory = hv.availableMemory >= (server.image.meta["required-memory"] as Long) val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts index d7d5f002..31fcda2f 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -31,7 +31,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-compute:opendc-compute-service")) - api(project(":opendc-metal")) api(project(":opendc-simulator:opendc-simulator-compute")) api(project(":opendc-simulator:opendc-simulator-failures")) implementation(project(":opendc-utils")) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt deleted file mode 100644 index 2405a8f9..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.compute.simulator.power.api.CpuPowerModel -import org.opendc.compute.simulator.power.api.Powerable -import org.opendc.compute.simulator.power.models.ConstantPowerModel -import org.opendc.metal.Node -import org.opendc.metal.NodeEvent -import org.opendc.metal.NodeState -import org.opendc.metal.driver.BareMetalDriver -import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.failures.FailureDomain -import org.opendc.utils.flow.EventFlow -import org.opendc.utils.flow.StateFlow -import java.time.Clock -import java.util.UUID - -/** - * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. - * - * @param coroutineScope The [CoroutineScope] the driver runs in. - * @param clock The virtual clock to keep track of time. - * @param uid The unique identifier of the machine. - * @param name An optional name of the machine. - * @param metadata The initial metadata of the node. - * @param machine The machine model to simulate. - * @param cpuPowerModel The CPU power model of this machine. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public class SimBareMetalDriver( - private val coroutineScope: CoroutineScope, - private val clock: Clock, - uid: UUID, - name: String, - metadata: Map<String, Any>, - machine: SimMachineModel, - cpuPowerModel: CpuPowerModel = ConstantPowerModel(0.0), -) : BareMetalDriver, FailureDomain, Powerable { - /** - * The flavor that corresponds to this machine. - */ - private val flavor = Flavor( - machine.cpus.size, - machine.memory.map { it.size }.sum() - ) - - /** - * The events of the machine. - */ - private val events = EventFlow<NodeEvent>() - - /** - * The machine state. - */ - private val nodeState = - StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, flavor, Image.EMPTY, events)) - - /** - * The [SimBareMetalMachine] we use to run the workload. - */ - private val machine = SimBareMetalMachine(coroutineScope, clock, machine) - - override val node: Flow<Node> = nodeState - - override val usage: Flow<Double> - get() = this.machine.usage - - override val powerDraw: Flow<Double> = cpuPowerModel.getPowerDraw(this) - - /** - * The [Job] that runs the simulated workload. - */ - private var job: Job? = null - - override suspend fun init(): Node { - return nodeState.value - } - - override suspend fun start(): Node { - val node = nodeState.value - if (node.state != NodeState.SHUTOFF) { - return node - } - - val workload = node.image.tags["workload"] as SimWorkload - - job = coroutineScope.launch { - delay(1) // TODO Introduce boot time - initMachine() - try { - machine.run(workload, mapOf("driver" to this@SimBareMetalDriver, "node" to node)) - exitMachine(null) - } catch (_: CancellationException) { - // Ignored - } catch (cause: Throwable) { - exitMachine(cause) - } - } - - setNode(node.copy(state = NodeState.BOOT)) - return nodeState.value - } - - private fun initMachine() { - setNode(nodeState.value.copy(state = NodeState.ACTIVE)) - } - - private fun exitMachine(cause: Throwable?) { - val newNodeState = - if (cause == null) - NodeState.SHUTOFF - else - NodeState.ERROR - setNode(nodeState.value.copy(state = newNodeState)) - } - - override suspend fun stop(): Node { - val node = nodeState.value - if (node.state == NodeState.SHUTOFF) { - return node - } - - job?.cancelAndJoin() - setNode(node.copy(state = NodeState.SHUTOFF)) - return node - } - - override suspend fun reboot(): Node { - stop() - return start() - } - - override suspend fun setImage(image: Image): Node { - setNode(nodeState.value.copy(image = image)) - return nodeState.value - } - - override suspend fun refresh(): Node = nodeState.value - - private fun setNode(value: Node) { - val field = nodeState.value - if (field.state != value.state) { - events.emit(NodeEvent.StateChanged(value, field.state)) - } - - nodeState.value = value - } - - override val scope: CoroutineScope - get() = coroutineScope - - override suspend fun fail() { - setNode(nodeState.value.copy(state = NodeState.ERROR)) - } - - override suspend fun recover() { - setNode(nodeState.value.copy(state = NodeState.ACTIVE)) - } - - override fun toString(): String = "SimBareMetalDriver(node = ${nodeState.value.uid})" -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index fd547d3d..19fa3e97 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -29,34 +29,43 @@ import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* -import org.opendc.metal.Node +import org.opendc.compute.simulator.power.api.CpuPowerModel +import org.opendc.compute.simulator.power.api.Powerable +import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.simulator.compute.* import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.failures.FailureDomain import org.opendc.utils.flow.EventFlow +import java.time.Clock import java.util.* +import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume /** * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. */ public class SimHost( - public val node: Node, - private val coroutineScope: CoroutineScope, - hypervisor: SimHypervisorProvider -) : Host, SimWorkload { + override val uid: UUID, + override val name: String, + model: SimMachineModel, + override val meta: Map<String, Any>, + context: CoroutineContext, + clock: Clock, + hypervisor: SimHypervisorProvider, + cpuPowerModel: CpuPowerModel = ConstantPowerModel(0.0), + private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper() +) : Host, FailureDomain, Powerable, AutoCloseable { /** - * The logger instance of this server. + * The [CoroutineScope] of the host bounded by the lifecycle of the host. */ - private val logger = KotlinLogging.logger {} + override val scope: CoroutineScope = CoroutineScope(context) /** - * The execution context in which the [Host] runs. + * The logger instance of this server. */ - private lateinit var ctx: SimExecutionContext + private val logger = KotlinLogging.logger {} override val events: Flow<HostEvent> get() = _events @@ -70,12 +79,17 @@ public class SimHost( /** * Current total memory use of the images on this hypervisor. */ - private var availableMemory: Long = 0 + private var availableMemory: Long = model.memory.map { it.size }.sum() + + /** + * The machine to run on. + */ + public val machine: SimBareMetalMachine = SimBareMetalMachine(scope, clock, model) /** * The hypervisor to run multiple workloads. */ - private val hypervisor = hypervisor.create( + public val hypervisor: SimHypervisor = hypervisor.create( object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, @@ -107,26 +121,40 @@ public class SimHost( */ private val guests = HashMap<Server, Guest>() - override val uid: UUID - get() = node.uid - - override val name: String - get() = node.name - - override val model: HostModel - get() = HostModel(node.flavor.cpuCount, node.flavor.memorySize) - override val state: HostState get() = _state - private var _state: HostState = HostState.UP + private var _state: HostState = HostState.DOWN set(value) { - listeners.forEach { it.onStateChanged(this, value) } + if (value != field) { + listeners.forEach { it.onStateChanged(this, value) } + } field = value } + override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum()) + + override val powerDraw: Flow<Double> = cpuPowerModel.getPowerDraw(this) + + init { + // Launch hypervisor onto machine + scope.launch { + try { + _state = HostState.UP + machine.run(this@SimHost.hypervisor, emptyMap()) + } catch (_: CancellationException) { + // Ignored + } catch (cause: Throwable) { + logger.error(cause) { "Host failed" } + throw cause + } finally { + _state = HostState.DOWN + } + } + } + override fun canFit(server: Server): Boolean { val sufficientMemory = availableMemory > server.flavor.memorySize - val enoughCpus = ctx.machine.cpus.size >= server.flavor.cpuCount + val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) return sufficientMemory && enoughCpus && canFit @@ -146,7 +174,7 @@ public class SimHost( guest.start() } - _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) } override fun contains(server: Server): Boolean { @@ -163,7 +191,7 @@ public class SimHost( guest.stop() } - override suspend fun terminate(server: Server) { + override suspend fun delete(server: Server) { val guest = guests.remove(server) ?: return guest.terminate() } @@ -176,11 +204,16 @@ public class SimHost( listeners.remove(listener) } + override fun close() { + scope.cancel() + _state = HostState.DOWN + } + /** * Convert flavor to machine model. */ private fun Flavor.toMachineModel(): SimMachineModel { - val originalCpu = ctx.machine.cpus[0] + val originalCpu = machine.model.cpus[0] val processingNode = originalCpu.node.copy(coreCount = cpuCount) val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) @@ -190,7 +223,7 @@ public class SimHost( private fun onGuestStart(vm: Guest) { guests.forEach { _, guest -> - if (guest.state == ServerState.ACTIVE) { + if (guest.state == ServerState.RUNNING) { vm.performanceInterferenceModel?.onStart(vm.server.image.name) } } @@ -200,58 +233,71 @@ public class SimHost( private fun onGuestStop(vm: Guest) { guests.forEach { _, guest -> - if (guest.state == ServerState.ACTIVE) { + if (guest.state == ServerState.RUNNING) { vm.performanceInterferenceModel?.onStop(vm.server.image.name) } } listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) + } + + override suspend fun fail() { + _state = HostState.DOWN + } + + override suspend fun recover() { + _state = HostState.UP } /** * A virtual machine instance that the driver manages. */ private inner class Guest(val server: Server, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - var state: ServerState = ServerState.SHUTOFF + var state: ServerState = ServerState.TERMINATED suspend fun start() { when (state) { - ServerState.SHUTOFF -> { + ServerState.TERMINATED -> { logger.info { "User requested to start server ${server.uid}" } launch() } - ServerState.ACTIVE -> return + ServerState.RUNNING -> return + ServerState.DELETED -> { + logger.warn { "User tried to start terminated server" } + throw IllegalArgumentException("Server is terminated") + } else -> assert(false) { "Invalid state transition" } } } suspend fun stop() { when (state) { - ServerState.ACTIVE, ServerState.ERROR -> { + ServerState.RUNNING, ServerState.ERROR -> { val job = job ?: throw IllegalStateException("Server should be active") job.cancel() job.join() } - ServerState.SHUTOFF -> return + ServerState.TERMINATED, ServerState.DELETED -> return else -> assert(false) { "Invalid state transition" } } } suspend fun terminate() { stop() + state = ServerState.DELETED } private var job: Job? = null private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> assert(job == null) { "Concurrent job running" } - val workload = server.image.tags["workload"] as SimWorkload + val workload = mapper.createWorkload(server) - val job = coroutineScope.launch { + val job = scope.launch { delay(1) // TODO Introduce boot time init() cont.resume(Unit) @@ -271,14 +317,14 @@ public class SimHost( } private fun init() { - state = ServerState.ACTIVE + state = ServerState.RUNNING onGuestStart(this) } private fun exit(cause: Throwable?) { state = if (cause == null) - ServerState.SHUTOFF + ServerState.TERMINATED else ServerState.ERROR @@ -286,18 +332,4 @@ public class SimHost( onGuestStop(this) } } - - override fun onStart(ctx: SimExecutionContext) { - this.ctx = ctx - this.availableMemory = ctx.machine.memory.map { it.size }.sum() - this.hypervisor.onStart(ctx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return hypervisor.onStart(ctx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return hypervisor.onNext(ctx, cpu, remainingWork) - } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt deleted file mode 100644 index bb03777b..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.* -import org.opendc.compute.api.Image -import org.opendc.compute.service.driver.Host -import org.opendc.metal.Node -import org.opendc.metal.service.ProvisioningService -import org.opendc.simulator.compute.SimHypervisorProvider -import kotlin.coroutines.CoroutineContext - -/** - * A helper class to provision [SimHost]s on top of bare-metal machines using the [ProvisioningService]. - * - * @param context The [CoroutineContext] to use. - * @param metal The [ProvisioningService] to use. - * @param hypervisor The type of hypervisor to use. - */ -public class SimHostProvisioner( - private val context: CoroutineContext, - private val metal: ProvisioningService, - private val hypervisor: SimHypervisorProvider -) : AutoCloseable { - /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context) - - /** - * Provision all machines with a host. - */ - public suspend fun provisionAll(): List<Host> = coroutineScope { - metal.nodes().map { node -> async { provision(node) } }.awaitAll() - } - - /** - * Provision the specified [Node]. - */ - public suspend fun provision(node: Node): Host = coroutineScope { - val host = SimHost(node, scope, hypervisor) - metal.deploy(node, Image(node.uid, node.name, mapOf("workload" to host))) - host - } - - override fun close() { - scope.cancel() - } -} diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt index 30ce423c..c05f1a2c 100644 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt @@ -20,22 +20,16 @@ * SOFTWARE. */ -package org.opendc.metal +package org.opendc.compute.simulator + +import org.opendc.compute.api.Server +import org.opendc.simulator.compute.workload.SimWorkload /** - * An event that is emitted by a [Node]. + * A [SimWorkloadMapper] that maps a [Server] to a workload via the meta-data. */ -public sealed class NodeEvent { - /** - * The node that emitted the event. - */ - public abstract val node: Node - - /** - * This event is emitted when the state of [node] changes. - * - * @property node The node of which the state changed. - * @property previousState The previous state of the node. - */ - public data class StateChanged(override val node: Node, val previousState: NodeState) : NodeEvent() +public class SimMetaWorkloadMapper(private val key: String = "workload") : SimWorkloadMapper { + override fun createWorkload(server: Server): SimWorkload { + return requireNotNull(server.meta[key] ?: server.image.meta[key]) as SimWorkload + } } diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/Resource.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt index 5bb2c2ce..7082c5cf 100644 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/Resource.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,17 @@ * SOFTWARE. */ -package org.opendc.core.resource +package org.opendc.compute.simulator -import org.opendc.core.Identity +import org.opendc.compute.api.Server +import org.opendc.simulator.compute.workload.SimWorkload /** - * Represents a generic cloud resource. + * A [SimWorkloadMapper] is responsible for mapping a [Server] and [Image] to a [SimWorkload] that can be simulated. */ -public interface Resource : Identity { +public fun interface SimWorkloadMapper { /** - * The tags of this cloud resource. + * Map the specified [server] to a [SimWorkload] that can be simulated. */ - public val tags: TagContainer + public fun createWorkload(server: Server): SimWorkload } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt index 0141bc8c..604b69c0 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt @@ -2,7 +2,7 @@ package org.opendc.compute.simulator.power.api import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map -import org.opendc.metal.driver.BareMetalDriver +import org.opendc.compute.simulator.SimHost public interface CpuPowerModel { /** @@ -18,14 +18,14 @@ public interface CpuPowerModel { /** * Emits the values of power consumption for servers. * - * @param driver A [BareMetalDriver] that offers host CPU utilization. + * @param host A [SimHost] that offers host CPU utilization. * @param withoutIdle A [Boolean] flag indicates whether (false) add a constant * power consumption value when the server is idle or (true) not * with a default value being false. * @return A [Flow] of values representing the server power draw. */ - public fun getPowerDraw(driver: BareMetalDriver, withoutIdle: Boolean = false): Flow<Double> = - driver.usage.map { + public fun getPowerDraw(host: SimHost, withoutIdle: Boolean = false): Flow<Double> = + host.machine.usage.map { computeCpuPower(it) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt deleted file mode 100644 index 0d90376e..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.withContext -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.Image -import org.opendc.metal.NodeEvent -import org.opendc.metal.NodeState -import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.util.UUID - -@OptIn(ExperimentalCoroutinesApi::class) -internal class SimBareMetalDriverTest { - private lateinit var machineModel: SimMachineModel - - @BeforeEach - fun setUp() { - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - - machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 2000.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } - ) - } - - @Test - fun testFlopsWorkload() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - - var finalState: NodeState = NodeState.UNKNOWN - var finalTime = 0L - - testScope.launch { - val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) - val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to SimFlopsWorkload(4_000, utilization = 1.0))) - // Batch driver commands - withContext(coroutineContext) { - driver.init() - driver.setImage(image) - val node = driver.start() - node.events.collect { event -> - when (event) { - is NodeEvent.StateChanged -> { - finalState = event.node.state - finalTime = clock.millis() - } - } - } - } - } - - testScope.advanceUntilIdle() - assertAll( - { assertEquals(NodeState.SHUTOFF, finalState) }, - { assertEquals(501, finalTime) } - ) - } -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 61bff39f..e1a1d87e 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,7 +24,6 @@ package org.opendc.compute.simulator import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -39,8 +38,6 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.api.ServerWatcher import org.opendc.compute.service.driver.HostEvent -import org.opendc.metal.Node -import org.opendc.metal.NodeState import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -82,18 +79,13 @@ internal class SimHostTest { var grantedWork = 0L var overcommittedWork = 0L - val node = Node( - UUID.randomUUID(), "name", emptyMap(), NodeState.SHUTOFF, - Flavor(machineModel.cpus.size, machineModel.memory.map { it.size }.sum()), Image.EMPTY, emptyFlow() - ) - scope.launch { - val virtDriver = SimHost(node, this, SimFairShareHypervisorProvider()) - val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver)) + val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider()) val duration = 5 * 60L - val vmImageA = Image( + val vmImageA = MockImage( UUID.randomUUID(), "<unnamed>", + emptyMap(), mapOf( "workload" to SimTraceWorkload( sequenceOf( @@ -105,9 +97,10 @@ internal class SimHostTest { ) ) ) - val vmImageB = Image( + val vmImageB = MockImage( UUID.randomUUID(), "<unnamed>", + emptyMap(), mapOf( "workload" to SimTraceWorkload( sequenceOf( @@ -120,16 +113,9 @@ internal class SimHostTest { ) ) - val metalDriver = - SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) - - metalDriver.init() - metalDriver.setImage(vmm) - metalDriver.start() - delay(5) - val flavor = Flavor(2, 0) + val flavor = MockFlavor(2, 0) virtDriver.events .onEach { event -> when (event) { @@ -157,14 +143,56 @@ internal class SimHostTest { ) } + private class MockFlavor( + override val cpuCount: Int, + override val memorySize: Long + ) : Flavor { + override val uid: UUID = UUID.randomUUID() + override val name: String = "test" + override val labels: Map<String, String> = emptyMap() + override val meta: Map<String, Any> = emptyMap() + + override suspend fun delete() { + throw NotImplementedError() + } + + override suspend fun refresh() { + throw NotImplementedError() + } + } + + private class MockImage( + override val uid: UUID, + override val name: String, + override val labels: Map<String, String>, + override val meta: Map<String, Any> + ) : Image { + override suspend fun delete() { + throw NotImplementedError() + } + + override suspend fun refresh() { + throw NotImplementedError() + } + } + private class MockServer( override val uid: UUID, override val name: String, override val flavor: Flavor, override val image: Image ) : Server { - override val tags: Map<String, String> = emptyMap() - override val state: ServerState = ServerState.BUILD + override val labels: Map<String, String> = emptyMap() + + override val meta: Map<String, Any> = emptyMap() + + override val state: ServerState = ServerState.TERMINATED + + override suspend fun start() {} + + override suspend fun stop() {} + + override suspend fun delete() {} override fun watch(watcher: ServerWatcher) {} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt deleted file mode 100644 index 33b3db94..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.opendc.compute.api.Image -import org.opendc.metal.service.SimpleProvisioningService -import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.util.UUID - -/** - * Test suite for the [SimpleProvisioningService]. - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal class SimProvisioningServiceTest { - private lateinit var machineModel: SimMachineModel - - @BeforeEach - fun setUp() { - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - - machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 2000.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } - ) - } - - /** - * A basic smoke test. - */ - @Test - fun testSmoke() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - - testScope.launch { - val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("machine" to SimFlopsWorkload(1000))) - val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) - - val provisioner = SimpleProvisioningService() - provisioner.create(driver) - delay(5) - val nodes = provisioner.nodes() - val node = provisioner.deploy(nodes.first(), image) - node.events.collect { println(it) } - } - - testScope.advanceUntilIdle() - } -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt index d4d88fb1..9d034a5d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt @@ -1,18 +1,21 @@ package org.opendc.compute.simulator.power import io.mockk.* +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.* -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource +import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.power.api.CpuPowerModel import org.opendc.compute.simulator.power.models.* -import org.opendc.metal.driver.BareMetalDriver +import org.opendc.simulator.compute.SimBareMetalMachine import java.util.stream.Stream import kotlin.math.pow +@OptIn(ExperimentalCoroutinesApi::class) internal class CpuPowerModelTest { private val epsilon = 10.0.pow(-3) private val cpuUtil = .9 @@ -44,21 +47,24 @@ internal class CpuPowerModelTest { powerModel: CpuPowerModel, expectedPowerConsumption: Double ) { - val cpuLoads = flowOf(cpuUtil, cpuUtil, cpuUtil) - val bareMetalDriver = mockkClass(BareMetalDriver::class) - every { bareMetalDriver.usage } returns cpuLoads + runBlockingTest { + val cpuLoads = flowOf(cpuUtil, cpuUtil, cpuUtil).stateIn(this) + val bareMetalDriver = mockkClass(SimHost::class) + val machine = mockkClass(SimBareMetalMachine::class) + every { bareMetalDriver.machine } returns machine + every { machine.usage } returns cpuLoads - runBlocking { val serverPowerDraw = powerModel.getPowerDraw(bareMetalDriver) - assertEquals(serverPowerDraw.count(), cpuLoads.count()) assertEquals( serverPowerDraw.first().toDouble(), flowOf(expectedPowerConsumption).first().toDouble(), epsilon ) + + verify(exactly = 1) { bareMetalDriver.machine } + verify(exactly = 1) { machine.usage } } - verify(exactly = 1) { bareMetalDriver.usage } } @Suppress("unused") diff --git a/simulator/opendc-core/build.gradle.kts b/simulator/opendc-core/build.gradle.kts deleted file mode 100644 index 7e1a4b97..00000000 --- a/simulator/opendc-core/build.gradle.kts +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2017 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Base model for datacenter simulation" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(platform(project(":opendc-platform"))) - api("org.jetbrains.kotlinx:kotlinx-coroutines-core") -} diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Environment.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/Environment.kt deleted file mode 100644 index a5055cff..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Environment.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core - -/** - * A description of a large-scale computing environment. This description includes including key size and topology - * information of the environment, types of resources, but also various operational and management rules such as - * scheduled maintenance, allocation and other constraints. - * - * @property name The name of the environment. - * @property description A small textual description about the environment that is being modeled. - * @property platforms The cloud platforms (such as AWS or GCE) in this environment. - */ -public data class Environment(val name: String, val description: String?, val platforms: List<Platform>) diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Platform.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/Platform.kt deleted file mode 100644 index 5550ffed..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Platform.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core - -import java.util.* - -/** - * A representation of a cloud platform such as Amazon Web Services (AWS), Microsoft Azure or Google Cloud. - * - * @property uid The unique identifier of this topology. - * @property name the name of the platform. - * @property zones The availability zones available on this platform. - */ -public data class Platform(override val uid: UUID, override val name: String, val zones: List<Zone>) : Identity diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Zone.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/Zone.kt deleted file mode 100644 index 834f6cf2..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Zone.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core - -import org.opendc.core.services.ServiceRegistry -import java.util.* - -/** - * An isolated location within a topology region from which public cloud services operate, roughly equivalent to a - * single topology. Zones contain one or more clusters and secondary storage. - * - * This class models *only* the static information of a zone, with dynamic information being contained within the zone's - * actor. During runtime, it's actor acts as a registry for all the cloud services provided by the zone. - * - * @property uid The unique identifier of this availability zone. - * @property name The name of the zone within its platform. - * @property services The service registry containing the services of the zone. - */ -public data class Zone( - override val uid: UUID, - override val name: String, - val services: ServiceRegistry -) : Identity { - override fun equals(other: Any?): Boolean = other is Zone && uid == other.uid - override fun hashCode(): Int = uid.hashCode() -} diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/TagContainer.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/TagContainer.kt deleted file mode 100644 index 6a4ff102..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/TagContainer.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core.resource - -/** - * An immutable map containing the tags of some resource. - */ -public typealias TagContainer = Map<String, Any> - -/** - * Obtain the value of the tag with the specified [key] of type [T]. If the tag does not exist or the tag is of - * different type, `null` is returned. - */ -public inline fun <reified T : Any> TagContainer.typed(key: String): T? = this[key] as? T diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistry.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistry.kt deleted file mode 100644 index 7434d91c..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistry.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core.services - -/** - * An immutable service registry interface. - */ -public interface ServiceRegistry { - /** - * The keys in this registry. - */ - public val keys: Collection<ServiceKey<*>> - - /** - * Determine if this map contains the service with the specified [ServiceKey]. - * - * @param key The key of the service to check for. - * @return `true` if the service is in the map, `false` otherwise. - */ - public operator fun contains(key: ServiceKey<*>): Boolean - - /** - * Obtain the service with the specified [ServiceKey]. - * - * @param key The key of the service to obtain. - * @return The references to the service. - * @throws IllegalArgumentException if the key does not exist in the map. - */ - public operator fun <T : Any> get(key: ServiceKey<T>): T - - /** - * Return the result of associating the specified [service] with the given [key] in this registry. - */ - public fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry -} - -/** - * Construct an empty [ServiceRegistry]. - */ -@Suppress("FunctionName") -public fun ServiceRegistry(): ServiceRegistry = ServiceRegistryImpl(emptyMap()) diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistryImpl.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistryImpl.kt deleted file mode 100644 index e117bec6..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistryImpl.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core.services - -/** - * Default implementation of the [ServiceRegistry] interface. - */ -internal class ServiceRegistryImpl(private val map: Map<ServiceKey<*>, Any>) : ServiceRegistry { - override val keys: Collection<ServiceKey<*>> - get() = map.keys - - override fun contains(key: ServiceKey<*>): Boolean = key in map - - override fun <T : Any> get(key: ServiceKey<T>): T { - @Suppress("UNCHECKED_CAST") - return map[key] as T - } - - override fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry = - ServiceRegistryImpl(map.plus(key to service)) - - override fun toString(): String = map.toString() -} diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/workload/Workload.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/workload/Workload.kt deleted file mode 100644 index f0bd1137..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/workload/Workload.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core.workload - -import org.opendc.core.Identity -import org.opendc.core.User - -/** - * A high-level abstraction that represents the actual work that a set of compute resources perform, such - * as running an application on a machine or a whole workflow running multiple tasks on numerous machines. - */ -public interface Workload : Identity { - /** - * The owner of this workload. - */ - public val owner: User -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 636f291c..2d0da1bf 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -31,7 +31,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-harness")) implementation(project(":opendc-format")) implementation(project(":opendc-simulator:opendc-simulator-core")) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index a5cf4fc0..f327b55d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.experiment +package org.opendc.experiments.capelin import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -32,30 +32,24 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.driver.HostListener +import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AllocationPolicy -import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader -import org.opendc.metal.NODE_CLUSTER -import org.opendc.metal.NodeEvent -import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector -import org.opendc.simulator.failures.FailureDomain import org.opendc.simulator.failures.FaultInjector import org.opendc.trace.core.EventTracer import java.io.File @@ -72,20 +66,20 @@ private val logger = KotlinLogging.logger {} /** * Construct the failure domain for the experiments. */ -public suspend fun createFailureDomain( +public fun createFailureDomain( coroutineScope: CoroutineScope, clock: Clock, seed: Int, failureInterval: Double, - bareMetalProvisioner: ProvisioningService, + service: ComputeService, chan: Channel<Unit> ): CoroutineScope { val job = coroutineScope.launch { chan.receive() val random = Random(seed) val injectors = mutableMapOf<String, FaultInjector>() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String + for (host in service.hosts) { + val cluster = host.meta["cluster"] as String val injector = injectors.getOrPut(cluster) { createFaultInjector( @@ -95,7 +89,7 @@ public suspend fun createFailureDomain( failureInterval ) } - injector.enqueue(node.metadata["driver"] as FailureDomain) + injector.enqueue(host as SimHost) } } return CoroutineScope(coroutineScope.coroutineContext + job) @@ -139,41 +133,39 @@ public fun createTraceReader( ) } -public data class ProvisionerResult( - val metal: ProvisioningService, - val provisioner: SimHostProvisioner, - val compute: ComputeServiceImpl -) - /** - * Construct the environment for a VM provisioner and return the provisioner instance. + * Construct the environment for a simulated compute service.. */ -public suspend fun createProvisioner( +public fun createComputeService( coroutineScope: CoroutineScope, clock: Clock, environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, eventTracer: EventTracer -): ProvisionerResult { - val environment = environmentReader.use { it.construct(coroutineScope, clock) } - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider()) - val hosts = provisioner.provisionAll() +): ComputeServiceImpl { + val hosts = environmentReader + .use { it.read() } + .map { def -> + SimHost( + def.uid, + def.name, + def.model, + def.meta, + coroutineScope.coroutineContext, + clock, + SimFairShareHypervisorProvider(), + def.powerModel + ) + } - val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + val scheduler = + ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl for (host in hosts) { scheduler.addHost(host) } - // Wait for the hypervisors to be spawned - delay(10) - - return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler) + return scheduler } /** @@ -186,25 +178,16 @@ public fun attachMonitor( scheduler: ComputeService, monitor: ExperimentMonitor ) { - - val hypervisors = scheduler.hosts - - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose Host directly but use Hypervisor class. - val server = (hypervisor as SimHost).node - monitor.reportHostStateChange(clock.millis(), hypervisor, server) - server.events - .onEach { event -> - val time = clock.millis() - when (event) { - is NodeEvent.StateChanged -> { - monitor.reportHostStateChange(time, hypervisor, event.node) - } - } + // Monitor host events + for (host in scheduler.hosts) { + monitor.reportHostStateChange(clock.millis(), host, HostState.UP) + host.addListener(object : HostListener { + override fun onStateChanged(host: Host, newState: HostState) { + monitor.reportHostStateChange(clock.millis(), host, newState) } - .launchIn(coroutineScope) - hypervisor.events + }) + + host.events .onEach { event -> when (event) { is HostEvent.SliceFinished -> monitor.reportHostSlice( @@ -216,15 +199,14 @@ public fun attachMonitor( event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - (event.driver as SimHost).node + event.driver ) } } .launchIn(coroutineScope) - val driver = server.metadata["driver"] as SimBareMetalDriver - driver.powerDraw - .onEach { monitor.reportPowerConsumption(server, it) } + (host as SimHost).powerDraw + .onEach { monitor.reportPowerConsumption(host, it) } .launchIn(coroutineScope) } @@ -244,29 +226,32 @@ public fun attachMonitor( public suspend fun processTrace( coroutineScope: CoroutineScope, clock: Clock, - reader: TraceReader<ComputeWorkload>, + reader: TraceReader<SimWorkload>, scheduler: ComputeService, chan: Channel<Unit>, monitor: ExperimentMonitor ) { val client = scheduler.newClient() + val image = client.newImage("vm-image") try { var submitted = 0 while (reader.hasNext()) { - val (time, workload) = reader.next() + val entry = reader.next() submitted++ - delay(max(0, time - clock.millis())) + delay(max(0, entry.start - clock.millis())) coroutineScope.launch { chan.send(Unit) val server = client.newServer( - workload.image.name, - workload.image, - Flavor( - workload.image.tags["cores"] as Int, - workload.image.tags["required-memory"] as Long - ) + entry.name, + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta ) server.watch(object : ServerWatcher { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index ff0a026d..f9c96bb6 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -35,10 +35,6 @@ import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolic import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* -import org.opendc.experiments.capelin.experiment.attachMonitor -import org.opendc.experiments.capelin.experiment.createFailureDomain -import org.opendc.experiments.capelin.experiment.createProvisioner -import org.opendc.experiments.capelin.experiment.processTrace import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -157,7 +153,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { ) testScope.launch { - val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( + val scheduler = createComputeService( this, clock, environment, @@ -172,7 +168,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { clock, seeder.nextInt(), operationalPhenomena.failureFrequency, - bareMetalProvisioner, + scheduler, chan ) } else { @@ -197,7 +193,6 @@ public abstract class Portfolio(name: String) : Experiment(name) { failureDomain?.cancel() scheduler.close() - provisioner.close() } try { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 1e42cf56..14cc06dc 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -26,7 +26,7 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host -import org.opendc.metal.Node +import org.opendc.compute.service.driver.HostState import java.io.Closeable /** @@ -41,17 +41,12 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked when the state of a host changes. */ - public fun reportHostStateChange( - time: Long, - driver: Host, - host: Node - ) { - } + public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {} /** * Report the power consumption of a host. */ - public fun reportPowerConsumption(host: Node, draw: Double) {} + public fun reportPowerConsumption(host: Host, draw: Double) {} /** * This method is invoked for a host for each slice that is finishes. @@ -65,7 +60,7 @@ public interface ExperimentMonitor : Closeable { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - host: Node, + host: Host, duration: Long = 5 * 60 * 1000L ) { } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index 98052214..c9d57a98 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -27,11 +27,11 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.telemetry.HostEvent import org.opendc.experiments.capelin.telemetry.ProvisionerEvent import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter -import org.opendc.metal.Node import java.io.File /** @@ -51,7 +51,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: File(base, "provisioner-metrics/$partition/data.parquet"), bufferSize ) - private val currentHostEvent = mutableMapOf<Node, HostEvent>() + private val currentHostEvent = mutableMapOf<Host, HostEvent>() private var startTime = -1L override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { @@ -63,12 +63,8 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } } - override fun reportHostStateChange( - time: Long, - driver: Host, - host: Node - ) { - logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } + override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { + logger.debug { "Host ${host.uid} changed state $newState [$time]" } val previousEvent = currentHostEvent[host] @@ -97,9 +93,9 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: ) } - private val lastPowerConsumption = mutableMapOf<Node, Double>() + private val lastPowerConsumption = mutableMapOf<Host, Double>() - override fun reportPowerConsumption(host: Node, draw: Double) { + override fun reportPowerConsumption(host: Host, draw: Double) { lastPowerConsumption[host] = draw } @@ -112,7 +108,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - host: Node, + host: Host, duration: Long ) { val previousEvent = currentHostEvent[host] @@ -130,7 +126,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event @@ -148,7 +144,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event @@ -168,7 +164,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt index e7b6a7bb..899fc9b1 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.telemetry -import org.opendc.metal.Node +import org.opendc.compute.service.driver.Host /** * A periodic report of the host machine metrics. @@ -30,7 +30,7 @@ import org.opendc.metal.Node public data class HostEvent( override val timestamp: Long, public val duration: Long, - public val node: Node, + public val host: Host, public val vmCount: Int, public val requestedBurst: Long, public val grantedBurst: Long, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt index b4fdd66a..4a3e7963 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt @@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : // record.put("portfolio_id", event.run.parent.parent.id) // record.put("scenario_id", event.run.parent.id) // record.put("run_id", event.run.id) - record.put("host_id", event.node.name) - record.put("state", event.node.state.name) + record.put("host_id", event.host.name) + record.put("state", event.host.state.name) record.put("timestamp", event.timestamp) record.put("duration", event.duration) record.put("vm_count", event.vmCount) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt index f9630078..a8462a51 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -22,14 +22,13 @@ package org.opendc.experiments.capelin.trace -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.workload.SimWorkload import java.util.TreeSet /** @@ -45,11 +44,11 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, workload: Workload, seed: Int -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The iterator over the actual trace. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> = + private val iterator: Iterator<TraceEntry<SimWorkload>> = rawReaders .map { it.read() } .run { @@ -67,19 +66,11 @@ public class Sc20ParquetTraceReader( this else { map { entry -> - val image = entry.workload.image - val id = image.name + val id = entry.name val relevantPerformanceInterferenceModelItems = performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) - val newImage = - Image( - image.uid, - image.name, - image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - ) - val newWorkload = entry.workload.copy(image = newImage) - Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) + entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems)) } } } @@ -87,7 +78,7 @@ public class Sc20ParquetTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt index b29bdc54..7ea5efe5 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -26,12 +26,10 @@ import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.File import java.util.UUID @@ -48,6 +46,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { * Read the fragments into memory. */ private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> { + @Suppress("DEPRECATION") val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet")) .disableCompatibility() .build() @@ -59,11 +58,9 @@ public class Sc20RawParquetTraceReader(private val path: File) { val record = reader.read() ?: break val id = record["id"].toString() - val tick = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long val fragment = SimTraceWorkload.Fragment( duration, @@ -83,13 +80,14 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntryImpl> { + private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { + @Suppress("DEPRECATION") val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet")) .disableCompatibility() .build() var counter = 0 - val entries = mutableListOf<TraceEntryImpl>() + val entries = mutableListOf<TraceEntry<SimWorkload>>() return try { while (true) { @@ -109,13 +107,9 @@ public class Sc20RawParquetTraceReader(private val path: File) { val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs val workload = SimTraceWorkload(vmFragments) - val vmWorkload = ComputeWorkload( - uid, - id, - UnnamedUser, - Image( - uid, - id, + entries.add( + TraceEntry( + uid, id, submissionTime, workload, mapOf( "submit-time" to submissionTime, "end-time" to endTime, @@ -126,7 +120,6 @@ public class Sc20RawParquetTraceReader(private val path: File) { ) ) ) - entries.add(TraceEntryImpl(submissionTime, vmWorkload)) } entries @@ -141,7 +134,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * The entries in the trace. */ - private val entries: List<TraceEntryImpl> + private val entries: List<TraceEntry<SimWorkload>> init { val fragments = parseFragments(path) @@ -151,21 +144,5 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the entries in the trace. */ - public fun read(): List<TraceEntry<ComputeWorkload>> = entries - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - internal data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> + public fun read(): List<TraceEntry<SimWorkload>> = entries } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index c588fda3..9ab69572 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -31,14 +31,12 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.File import java.io.Serializable import java.util.SortedSet @@ -62,11 +60,11 @@ public class Sc20StreamingParquetTraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * The intermediate buffer to store the read records in. @@ -98,6 +96,7 @@ public class Sc20StreamingParquetTraceReader( * The thread to read the records in. */ private val readerThread = thread(start = true, name = "sc20-reader") { + @Suppress("DEPRECATION") val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } @@ -113,11 +112,9 @@ public class Sc20StreamingParquetTraceReader( } val id = record["id"].toString() - val tick = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long val fragment = SimTraceWorkload.Fragment( duration, @@ -167,6 +164,7 @@ public class Sc20StreamingParquetTraceReader( val entries = mutableMapOf<String, GenericData.Record>() val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>() + @Suppress("DEPRECATION") val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } @@ -236,35 +234,25 @@ public class Sc20StreamingParquetTraceReader( Random(random.nextInt()) ) val workload = SimTraceWorkload(fragments) - val vmWorkload = ComputeWorkload( - uid, - "VM Workload $id", - UnnamedUser, - Image( - uid, - id, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - TraceEntryImpl( - submissionTime, - vmWorkload + TraceEntry( + uid, id, submissionTime, workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to maxCores, + "required-memory" to requiredMemory, + "workload" to workload + ) ) } - .sortedBy { it.submissionTime } + .sortedBy { it.start } .toList() .iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() { readerThread.interrupt() @@ -287,20 +275,4 @@ public class Sc20StreamingParquetTraceReader( return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() } } - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt index 881652f6..5c8727ea 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -23,12 +23,11 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry +import org.opendc.simulator.compute.workload.SimWorkload import java.util.* import kotlin.random.Random @@ -38,11 +37,11 @@ private val logger = KotlinLogging.logger {} * Sample the workload for the specified [run]. */ public fun sampleWorkload( - trace: List<TraceEntry<ComputeWorkload>>, + trace: List<TraceEntry<SimWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<ComputeWorkload>> { +): List<TraceEntry<SimWorkload>> { return when { workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) workload.samplingStrategy == SamplingStrategy.HPC -> @@ -58,24 +57,24 @@ public fun sampleWorkload( * Sample a regular (non-HPC) workload. */ public fun sampleRegularWorkload( - trace: List<TraceEntry<ComputeWorkload>>, + trace: List<TraceEntry<SimWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<ComputeWorkload>> { +): List<TraceEntry<SimWorkload>> { val fraction = subWorkload.fraction val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() val totalLoad = if (workload is CompositeWorkload) { workload.totalLoad } else { - shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + shuffled.sumByDouble { it.meta.getValue("total-load") as Double } } var currentLoad = 0.0 for (entry in shuffled) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + val entryLoad = entry.meta.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -93,23 +92,23 @@ public fun sampleRegularWorkload( * Sample a HPC workload. */ public fun sampleHpcWorkload( - trace: List<TraceEntry<ComputeWorkload>>, + trace: List<TraceEntry<SimWorkload>>, workload: Workload, seed: Int, sampleOnLoad: Boolean -): List<TraceEntry<ComputeWorkload>> { +): List<TraceEntry<SimWorkload>> { val pattern = Regex("^vm__workload__(ComputeNode|cn).*") val random = Random(seed) val fraction = workload.fraction val (hpc, nonHpc) = trace.partition { entry -> - val name = entry.workload.image.name + val name = entry.name name.matches(pattern) } val hpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() hpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -118,7 +117,7 @@ public fun sampleHpcWorkload( val nonHpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() nonHpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -130,7 +129,7 @@ public fun sampleHpcWorkload( val totalLoad = if (workload is CompositeWorkload) { workload.totalLoad } else { - trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + trace.sumByDouble { it.meta.getValue("total-load") as Double } } logger.debug { "Total trace load: $totalLoad" } @@ -139,12 +138,12 @@ public fun sampleHpcWorkload( var nonHpcCount = 0 var nonHpcLoad = 0.0 - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() if (sampleOnLoad) { var currentLoad = 0.0 for (entry in hpcSequence) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + val entryLoad = entry.meta.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -156,7 +155,7 @@ public fun sampleHpcWorkload( } for (entry in nonHpcSequence) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + val entryLoad = entry.meta.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > 1) { break } @@ -170,7 +169,7 @@ public fun sampleHpcWorkload( hpcSequence .take((fraction * trace.size).toInt()) .forEach { entry -> - hpcLoad += entry.workload.image.tags.getValue("total-load") as Double + hpcLoad += entry.meta.getValue("total-load") as Double hpcCount += 1 res.add(entry) } @@ -178,7 +177,7 @@ public fun sampleHpcWorkload( nonHpcSequence .take(((1 - fraction) * trace.size).toInt()) .forEach { entry -> - nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double + nonHpcLoad += entry.meta.getValue("total-load") as Double nonHpcCount += 1 res.add(entry) } @@ -194,16 +193,7 @@ public fun sampleHpcWorkload( /** * Sample a random trace entry. */ -private fun sample(entry: TraceEntry<ComputeWorkload>, i: Int): TraceEntry<ComputeWorkload> { - val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = Image( - id, - entry.workload.image.name, - entry.workload.image.tags - ) - val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) - return VmTraceEntry(vmWorkload, entry.submissionTime) +private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> { + val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) + return entry.copy(uid = uid) } - -private class VmTraceEntry(override val workload: ComputeWorkload, override val submissionTime: Long) : - TraceEntry<ComputeWorkload> diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index dfc6b90b..4e6cfddc 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,13 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy -import org.opendc.experiments.capelin.experiment.attachMonitor -import org.opendc.experiments.capelin.experiment.createFailureDomain -import org.opendc.experiments.capelin.experiment.createProvisioner -import org.opendc.experiments.capelin.experiment.processTrace import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader @@ -46,7 +42,7 @@ import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader -import org.opendc.metal.Node +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer import java.io.File @@ -101,15 +97,13 @@ class CapelinIntegrationTest { val tracer = EventTracer(clock) testScope.launch { - val res = createProvisioner( + scheduler = createComputeService( this, clock, environmentReader, allocationPolicy, tracer ) - val bareMetalProvisioner = res.metal - scheduler = res.compute val failureDomain = if (failures) { println("ENABLING failures") @@ -118,7 +112,7 @@ class CapelinIntegrationTest { clock, seed, 24.0 * 7, - bareMetalProvisioner, + scheduler, chan ) } else { @@ -140,7 +134,6 @@ class CapelinIntegrationTest { failureDomain?.cancel() scheduler.close() monitor.close() - res.provisioner.close() } runSimulation() @@ -166,7 +159,7 @@ class CapelinIntegrationTest { val tracer = EventTracer(clock) testScope.launch { - val (_, provisioner, scheduler) = createProvisioner( + val scheduler = createComputeService( this, clock, environmentReader, @@ -187,7 +180,6 @@ class CapelinIntegrationTest { scheduler.close() monitor.close() - provisioner.close() } runSimulation() @@ -209,7 +201,7 @@ class CapelinIntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<ComputeWorkload> { + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { return Sc20ParquetTraceReader( listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), emptyMap(), @@ -241,7 +233,7 @@ class CapelinIntegrationTest { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - host: Node, + host: Host, duration: Long ) { totalRequestedBurst += requestedBurst diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index 00aa0395..02e77c7c 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -30,10 +30,9 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-harness")) implementation(project(":opendc-format")) - implementation(project(":opendc-workflows")) + implementation(project(":opendc-workflow:opendc-workflow-service")) implementation(project(":opendc-simulator:opendc-simulator-core")) implementation(project(":opendc-compute:opendc-compute-simulator")) } diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 7b9d70ed..9e305b3d 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -26,23 +26,22 @@ import kotlinx.coroutines.* import kotlinx.coroutines.test.TestCoroutineScope import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.simulator.SimHostProvisioner +import org.opendc.compute.simulator.SimHost import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf -import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer import org.opendc.trace.core.enable -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.WorkflowEvent -import org.opendc.workflows.service.WorkflowSchedulerMode -import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy -import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy -import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy +import org.opendc.workflow.service.WorkflowEvent +import org.opendc.workflow.service.WorkflowService +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy import java.io.File import java.io.FileInputStream import kotlin.math.max @@ -84,16 +83,20 @@ public class UnderspecificationExperiment : Experiment("underspecification") { } testScope.launch { - val environment = Sc18EnvironmentReader(FileInputStream(File(environment))) - .use { it.construct(testScope, clock) } + val hosts = Sc18EnvironmentReader(FileInputStream(File(environment))) + .use { it.read() } + .map { def -> + SimHost( + def.uid, + def.name, + def.model, + def.meta, + testScope.coroutineContext, + clock, + SimSpaceSharedHypervisorProvider() + ) + } - val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) - val hosts = provisioner.provisionAll() val compute = ComputeService( testScope.coroutineContext, clock, @@ -103,11 +106,8 @@ public class UnderspecificationExperiment : Experiment("underspecification") { hosts.forEach { compute.addHost(it) } - // Wait for the hypervisors to be spawned - delay(10) - - val scheduler = StageWorkflowService( - testScope, + val scheduler = WorkflowService( + testScope.coroutineContext, clock, tracer, compute.newClient(), @@ -121,9 +121,9 @@ public class UnderspecificationExperiment : Experiment("underspecification") { val reader = GwfTraceReader(File(trace)) while (reader.hasNext()) { - val (time, job) = reader.next() - delay(max(0, time * 1000 - clock.millis())) - scheduler.submit(job) + val entry = reader.next() + delay(max(0, entry.start * 1000 - clock.millis())) + scheduler.submit(entry.workload) } } diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt index dbd04b87..a8356888 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.sc18 import org.opendc.trace.core.EventStream import org.opendc.trace.core.onEvent -import org.opendc.workflows.service.WorkflowEvent +import org.opendc.workflow.service.WorkflowEvent import java.util.* import kotlin.coroutines.resume import kotlin.coroutines.suspendCoroutine diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts index 37e9c9c8..385e556d 100644 --- a/simulator/opendc-format/build.gradle.kts +++ b/simulator/opendc-format/build.gradle.kts @@ -30,9 +30,8 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-workflows")) + api(project(":opendc-workflow:opendc-workflow-api")) implementation(project(":opendc-simulator:opendc-simulator-compute")) implementation(project(":opendc-compute:opendc-compute-simulator")) api("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt index 1f73bb61..97d6f239 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt @@ -22,17 +22,14 @@ package org.opendc.format.environment -import kotlinx.coroutines.CoroutineScope -import org.opendc.core.Environment import java.io.Closeable -import java.time.Clock /** - * An interface for reading descriptions of topology environments into memory as [Environment]. + * An interface for reading descriptions of topology environments into memory. */ public interface EnvironmentReader : Closeable { /** - * Construct an [Environment] in the specified [CoroutineScope]. + * Read the environment into a list. */ - public suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment + public fun read(): List<MachineDef> } diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Identity.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt index 252c40f5..b5b3b84b 100644 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Identity.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,21 +20,16 @@ * SOFTWARE. */ -package org.opendc.core +package org.opendc.format.environment +import org.opendc.compute.simulator.power.api.CpuPowerModel +import org.opendc.simulator.compute.SimMachineModel import java.util.* -/** - * An object that has a unique identity. - */ -public interface Identity { - /** - * A unique, opaque, system-generated value, representing the object. - */ - public val uid: UUID - - /** - * A non-empty, human-readable string representing the object. - */ - public val name: String -} +public data class MachineDef( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: SimMachineModel, + val powerModel: CpuPowerModel +) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index bbbbe87c..3da8d0b3 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -25,21 +25,14 @@ package org.opendc.format.environment.sc18 import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.simulator.SimBareMetalDriver -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry +import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream -import java.time.Clock import java.util.* /** @@ -55,9 +48,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + /** + * Read the environment. + */ + public override fun read(): List<MachineDef> { var counter = 0 - val nodes = setup.rooms.flatMap { room -> + return setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> when (roomObject) { is RoomObject.Rack -> { @@ -75,35 +71,18 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimBareMetalDriver( - coroutineScope, - clock, - UUID.randomUUID(), - "node-${counter++}", + MachineDef( + UUID(0L, counter++.toLong()), + "node-$counter", emptyMap(), - SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))) + SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), + ConstantPowerModel(0.0) ) } } } } } - - val provisioningService = SimpleProvisioningService() - for (node in nodes) { - provisioningService.create(node) - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - val platform = Platform( - UUID.randomUUID(), - "sc18-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(setup.name, null, listOf(platform)) } override fun close() {} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 998f9cd6..9a06a40f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -22,17 +22,9 @@ package org.opendc.format.environment.sc20 -import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.NODE_CLUSTER -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -40,7 +32,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit import java.io.File import java.io.FileInputStream import java.io.InputStream -import java.time.Clock import java.util.* /** @@ -54,8 +45,7 @@ public class Sc20ClusterEnvironmentReader( public constructor(file: File) : this(FileInputStream(file)) - @Suppress("BlockingMethodInNonBlockingContext") - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + public override fun read(): List<MachineDef> { var clusterIdCol = 0 var speedCol = 0 var numberOfHostsCol = 0 @@ -69,7 +59,7 @@ public class Sc20ClusterEnvironmentReader( var memoryPerHost: Long var coresPerHost: Int - val nodes = mutableListOf<SimBareMetalDriver>() + val nodes = mutableListOf<MachineDef>() val random = Random(0) input.bufferedReader().use { reader -> @@ -103,12 +93,10 @@ public class Sc20ClusterEnvironmentReader( repeat(numberOfHosts) { nodes.add( - SimBareMetalDriver( - coroutineScope, - clock, + MachineDef( UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", - mapOf(NODE_CLUSTER to clusterId), + mapOf("cluster" to clusterId), SimMachineModel( List(coresPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, speed) @@ -125,22 +113,7 @@ public class Sc20ClusterEnvironmentReader( } } - val provisioningService = SimpleProvisioningService() - for (node in nodes) { - provisioningService.create(node) - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "sc20-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment("SC20 Environment", null, listOf(platform)) + return nodes } override fun close() {} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 6cf65f7f..effd0286 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -25,22 +25,14 @@ package org.opendc.format.environment.sc20 import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream -import java.time.Clock import java.util.* /** @@ -55,9 +47,12 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + /** + * Read the environment. + */ + public override fun read(): List<MachineDef> { var counter = 0 - val nodes = setup.rooms.flatMap { room -> + return setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> when (roomObject) { is RoomObject.Rack -> { @@ -81,11 +76,9 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimBareMetalDriver( - coroutineScope, - clock, - UUID.randomUUID(), - "node-${counter++}", + MachineDef( + UUID(0L, counter++.toLong()), + "node-$counter", emptyMap(), SimMachineModel(cores, memories), // For now we assume a simple linear load model with an idle draw of ~200W and a maximum @@ -98,23 +91,6 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja } } } - - val provisioningService = SimpleProvisioningService() - for (node in nodes) { - provisioningService.create(node) - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "sc20-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(setup.name, null, listOf(platform)) } override fun close() {} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt index ec547e84..3ce79d69 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt @@ -24,31 +24,21 @@ package org.opendc.format.trace -import org.opendc.core.workload.Workload +import java.util.UUID /** * An entry in a workload trace. * - * @param T The shape of the workload in this entry. + * @param uid The unique identifier of the entry. + * @param name The name of the entry. + * @param start The start time of the workload. + * @param workload The workload of the entry. + * @param meta The meta-data associated with the workload. */ -public interface TraceEntry<T : Workload> { - /** - * The time of submission of the workload. - */ - public val submissionTime: Long - - /** - * The workload in this trace entry. - */ - public val workload: T - - /** - * Extract the submission time from this entry. - */ - public operator fun component1(): Long = submissionTime - - /** - * Extract the workload from this entry. - */ - public operator fun component2(): T = workload -} +public data class TraceEntry<out T>( + val uid: UUID, + val name: String, + val start: Long, + val workload: T, + val meta: Map<String, Any> +) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt index a0beec3e..7df1acd3 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt @@ -22,14 +22,13 @@ package org.opendc.format.trace -import org.opendc.core.workload.Workload import java.io.Closeable /** - * An interface for reading [Workload]s into memory. + * An interface for reading workloads into memory. * * This interface must guarantee that the entries are delivered in order of submission time. * * @param T The shape of the workloads supported by this reader. */ -public interface TraceReader<T : Workload> : Iterator<TraceEntry<T>>, Closeable +public interface TraceReader<T> : Iterator<TraceEntry<T>>, Closeable diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt deleted file mode 100644 index 54fb6214..00000000 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import org.opendc.core.workload.Workload -import java.io.Closeable - -/** - * An interface for persisting workload traces (e.g. to disk). - * - * @param T The type of [Workload] supported by this writer. - */ -public interface TraceWriter<T : Workload> : Closeable { - /** - * Write an entry to the trace. - * - * Entries must be written in order of submission time. Failing to do so results in a [IllegalArgumentException]. - * - * @param submissionTime The time of submission of the workload. - * @param workload The workload to write to the trace. - */ - public fun write(submissionTime: Long, workload: T) -} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 1571b17d..769b2b13 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -22,14 +22,12 @@ package org.opendc.format.trace.bitbrains -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -45,17 +43,17 @@ import kotlin.math.min public class BitbrainsTraceReader( traceDirectory: File, performanceInterferenceModel: PerformanceInterferenceModel -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>() + val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() var timestampCol = 0 var coreCol = 0 @@ -132,50 +130,27 @@ public class BitbrainsTraceReader( ) val workload = SimTraceWorkload(flopsHistory.asSequence()) - val vmWorkload = ComputeWorkload( + entries[vmId] = TraceEntry( uuid, - "VM Workload $vmId", - UnnamedUser, - Image( - uuid, - vmId.toString(), - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to cores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - entries[vmId] = TraceEntryImpl( + vmId.toString(), startTime, - vmWorkload + workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to cores, + "required-memory" to requiredMemory, + "workload" to workload + ) ) } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = entries.values.sortedBy { it.start }.iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index cd7aff3c..e68afeb7 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -22,15 +22,13 @@ package org.opendc.format.trace.gwf -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import java.io.BufferedReader import java.io.File import java.io.InputStream @@ -88,7 +86,8 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntryImpl>() + val workflows = mutableMapOf<Long, Job>() + val starts = mutableMapOf<Long, Long>() val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() @@ -131,22 +130,21 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { val flops: Long = 4000 * runtime * cores - val entry = entries.getOrPut(workflowId) { - TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) } - val workflow = entry.workload val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, taskId), "<unnamed>", - Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to workload)), HashSet(), mapOf( + "workload" to workload, WORKFLOW_TASK_CORES to cores, WORKFLOW_TASK_DEADLINE to (runtime * 1000) ), ) - entry.submissionTime = min(entry.submissionTime, submitTime) + starts.merge(workflowId, submitTime, ::min) (workflow.tasks as MutableSet<Task>).add(task) tasks[taskId] = task taskDependencies[task] = dependencies @@ -165,7 +163,9 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } + .sortedBy { it.start } + .iterator() } override fun hasNext(): Boolean = iterator.hasNext() @@ -173,20 +173,4 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { override fun next(): TraceEntry<Job> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: Job - ) : TraceEntry<Job> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt index 07785632..1eb4bac2 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -22,14 +22,12 @@ package org.opendc.format.trace.sc20 -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -49,17 +47,17 @@ public class Sc20TraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<UUID, TraceEntry<ComputeWorkload>>() + val entries = mutableMapOf<UUID, TraceEntry<SimWorkload>>() val timestampCol = 0 val cpuUsageCol = 1 @@ -85,7 +83,7 @@ public class Sc20TraceReader( var vmId = "" var maxCores = -1 var requiredMemory = -1L - var timestamp = -1L + var timestamp: Long var cores = -1 var minTime = Long.MAX_VALUE @@ -157,50 +155,27 @@ public class Sc20TraceReader( Random(random.nextInt()) ) val workload = SimTraceWorkload(flopsFragments.asSequence()) - val vmWorkload = ComputeWorkload( + entries[uuid] = TraceEntry( uuid, - "VM Workload $vmId", - UnnamedUser, - Image( - uuid, - vmId, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to cores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - entries[uuid] = TraceEntryImpl( + vmId, minTime, - vmWorkload + workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to cores, + "required-memory" to requiredMemory, + "workload" to workload + ) ) } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = entries.values.sortedBy { it.start }.iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt index ead20c35..0d1f3cea 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt @@ -22,12 +22,10 @@ package org.opendc.format.trace.swf -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -43,17 +41,17 @@ import java.util.* public class SwfTraceReader( file: File, maxNumCores: Int = -1 -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>() + val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() val jobNumberCol = 0 val submitTimeCol = 1 // seconds (begin of trace is 0) @@ -73,7 +71,6 @@ public class SwfTraceReader( var slicedWaitTime: Long var flopsPerSecond: Long var flopsPartialSlice: Long - var flopsFullSlice: Long var runtimePartialSliceRemainder: Long BufferedReader(FileReader(file)).use { reader -> @@ -127,7 +124,6 @@ public class SwfTraceReader( flopsPerSecond = 4_000L * cores runtimePartialSliceRemainder = runTime % sliceDuration flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder - flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice for ( tick in (submitTime + slicedWaitTime) @@ -155,48 +151,27 @@ public class SwfTraceReader( val uuid = UUID(0L, jobNumber) val workload = SimTraceWorkload(flopsHistory.asSequence()) - val vmWorkload = ComputeWorkload( + entries[jobNumber] = TraceEntry( uuid, - "SWF Workload $jobNumber", - UnnamedUser, - Image( - uuid, - jobNumber.toString(), - mapOf( - "cores" to cores, - "required-memory" to memory, - "workload" to workload - ) + jobNumber.toString(), + submitTime, + workload, + mapOf( + "cores" to cores, + "required-memory" to memory, + "workload" to workload ) ) - - entries[jobNumber] = TraceEntryImpl(submitTime, vmWorkload) } } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = entries.values.sortedBy { it.start }.iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index 5a271fab..feadf61f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,15 +25,13 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import java.util.UUID import kotlin.math.min @@ -53,10 +51,12 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntryImpl>() + val workflows = mutableMapOf<Long, Job>() + val starts = mutableMapOf<Long, Long>() val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() + @Suppress("DEPRECATION") val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build() while (true) { @@ -74,29 +74,22 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { val flops: Long = 4100 * (runtime / 1000) * cores - val entry = entries.getOrPut(workflowId) { - TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) } - val workflow = entry.workload val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, taskId), "<unnamed>", - Image( - UUID.randomUUID(), - "<unnamed>", - mapOf( - "workload" to workload - ) - ), HashSet(), mapOf( + "workload" to workload, WORKFLOW_TASK_CORES to cores, WORKFLOW_TASK_DEADLINE to runtime ) ) - entry.submissionTime = min(entry.submissionTime, submitTime) + starts.merge(workflowId, submitTime, ::min) (workflow.tasks as MutableSet<Task>).add(task) tasks[taskId] = task taskDependencies[task] = dependencies @@ -112,7 +105,9 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } + .sortedBy { it.start } + .iterator() } override fun hasNext(): Boolean = iterator.hasNext() @@ -120,20 +115,4 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { override fun next(): TraceEntry<Job> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: Job - ) : TraceEntry<Job> } diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt index 7e3d2623..e0e049cf 100644 --- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -32,14 +32,14 @@ class SwfTraceReaderTest { internal fun testParseSwf() { val reader = SwfTraceReader(File(SwfTraceReaderTest::class.java.getResource("/swf_trace.txt").toURI())) var entry = reader.next() - assertEquals(0, entry.submissionTime) + assertEquals(0, entry.start) // 1961 slices for waiting, 3 full and 1 partial running slices - assertEquals(1965, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size) + assertEquals(1965, (entry.workload as SimTraceWorkload).trace.toList().size) entry = reader.next() - assertEquals(164472, entry.submissionTime) + assertEquals(164472, entry.start) // 1188 slices for waiting, 0 full and 1 partial running slices - assertEquals(1189, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size) - assertEquals(0.25, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().last().usage) + assertEquals(1189, (entry.workload as SimTraceWorkload).trace.toList().size) + assertEquals(0.25, (entry.workload as SimTraceWorkload).trace.toList().last().usage) } } diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt index 58d96657..bcfa7553 100644 --- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt @@ -36,11 +36,11 @@ class WtfTraceReaderTest { fun testParseWtf() { val reader = WtfTraceReader("src/test/resources/wtf-trace") var entry = reader.next() - assertEquals(0, entry.submissionTime) + assertEquals(0, entry.start) assertEquals(23, entry.workload.tasks.size) entry = reader.next() - assertEquals(333387, entry.submissionTime) + assertEquals(333387, entry.start) assertEquals(23, entry.workload.tasks.size) } } diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt deleted file mode 100644 index f1d4ea2e..00000000 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.metal - -/** - * An enumeration describing the possible states of a bare-metal compute node. - */ -public enum class NodeState { - /** - * The node is booting. - */ - BOOT, - - /** - * The node is powered off. - */ - SHUTOFF, - - /** - * The node is active and running. - */ - ACTIVE, - - /** - * The node is in error. - */ - ERROR, - - /** - * The state of the node is unknown. - */ - UNKNOWN, -} diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt deleted file mode 100644 index 3b15be94..00000000 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.metal.driver - -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.api.Image -import org.opendc.compute.api.Server -import org.opendc.core.services.AbstractServiceKey -import org.opendc.metal.Node -import java.util.UUID - -/** - * A driver interface for the management interface of a bare-metal compute node. - */ -public interface BareMetalDriver { - /** - * The [Node] that is controlled by this driver. - */ - public val node: Flow<Node> - - /** - * The amount of work done by the machine in percentage with respect to the total amount of processing power - * available. - */ - public val usage: Flow<Double> - - /** - * Initialize the driver. - */ - public suspend fun init(): Node - - /** - * Start the bare metal node with the specified boot disk image. - */ - public suspend fun start(): Node - - /** - * Stop the bare metal node if it is running. - */ - public suspend fun stop(): Node - - /** - * Reboot the bare metal node. - */ - public suspend fun reboot(): Node - - /** - * Update the boot disk image of the compute node. - * - * Changing the boot disk image of node does not affect it while the node is running. In order to start the new boot - * disk image, the compute node must be restarted. - */ - public suspend fun setImage(image: Image): Node - - /** - * Obtain the state of the compute node. - */ - public suspend fun refresh(): Node - - /** - * A key that allows access to the [BareMetalDriver] instance from a [Server] that runs on the bare-metal machine. - */ - public companion object Key : AbstractServiceKey<BareMetalDriver>(UUID.randomUUID(), "bare-metal:driver") -} diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt deleted file mode 100644 index 2d6353c8..00000000 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.metal.service - -import kotlinx.coroutines.CancellationException -import org.opendc.compute.api.Image -import org.opendc.metal.Node -import org.opendc.metal.driver.BareMetalDriver - -/** - * A very basic implementation of the [ProvisioningService]. - */ -public class SimpleProvisioningService : ProvisioningService { - /** - * The active nodes in this service. - */ - private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf() - - override suspend fun create(driver: BareMetalDriver): Node { - val node = driver.init() - nodes[node] = driver - return node - } - - override suspend fun nodes(): Set<Node> = nodes.keys - - override suspend fun refresh(node: Node): Node { - return nodes[node]!!.refresh() - } - - override suspend fun deploy(node: Node, image: Image): Node { - val driver = nodes[node]!! - driver.setImage(image) - return driver.reboot() - } - - override suspend fun stop(node: Node): Node { - val driver = nodes[node]!! - return try { - driver.stop() - } catch (e: CancellationException) { - node - } - } -} diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts index d0b80cc7..d07fe7a6 100644 --- a/simulator/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc-runner-web/build.gradle.kts @@ -34,7 +34,6 @@ application { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) implementation(project(":opendc-compute:opendc-compute-simulator")) implementation(project(":opendc-format")) implementation(project(":opendc-experiments:opendc-experiments-capelin")) diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 482fe754..b9aeecb8 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -46,11 +46,11 @@ import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolic import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* -import org.opendc.experiments.capelin.experiment.attachMonitor -import org.opendc.experiments.capelin.experiment.createFailureDomain -import org.opendc.experiments.capelin.experiment.createProvisioner -import org.opendc.experiments.capelin.experiment.processTrace +import org.opendc.experiments.capelin.attachMonitor +import org.opendc.experiments.capelin.createComputeService +import org.opendc.experiments.capelin.createFailureDomain import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.capelin.processTrace import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader @@ -247,7 +247,7 @@ public class RunnerCli : CliktCommand(name = "runner") { val tracer = EventTracer(clock) testScope.launch { - val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( + val scheduler = createComputeService( this, clock, environment, @@ -262,7 +262,7 @@ public class RunnerCli : CliktCommand(name = "runner") { clock, seeder.nextInt(), operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, - bareMetalProvisioner, + scheduler, chan ) } else { @@ -287,7 +287,6 @@ public class RunnerCli : CliktCommand(name = "runner") { failureDomain?.cancel() scheduler.close() - provisioner.close() } try { diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index 2f11347d..e7e99a3d 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -28,36 +28,24 @@ import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Field import com.mongodb.client.model.Filters import com.mongodb.client.model.Projections -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch import org.bson.Document import org.bson.types.ObjectId -import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.NODE_CLUSTER -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import java.time.Clock import java.util.* /** * A helper class that converts the MongoDB topology into an OpenDC environment. */ public class TopologyParser(private val collection: MongoCollection<Document>, private val id: ObjectId) : EnvironmentReader { - /** - * Parse the topology with the specified [id]. - */ - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { - val nodes = mutableListOf<SimBareMetalDriver>() + + public override fun read(): List<MachineDef> { + val nodes = mutableListOf<MachineDef>() val random = Random(0) for (machine in fetchMachines(id)) { @@ -85,36 +73,17 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p val energyConsumptionW = machine.getList("cpus", Document::class.java).sumBy { it.getInteger("energyConsumptionW") }.toDouble() nodes.add( - SimBareMetalDriver( - coroutineScope, - clock, + MachineDef( UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$position", - mapOf(NODE_CLUSTER to clusterId), + mapOf("cluster" to clusterId), SimMachineModel(processors, memoryUnits), LinearPowerModel(2 * energyConsumptionW, .5) ) ) } - val provisioningService = SimpleProvisioningService() - coroutineScope.launch { - for (node in nodes) { - provisioningService.create(node) - } - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "opendc-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(fetchName(id), null, listOf(platform)) + return nodes } override fun close() {} diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index fe814c76..a8ac6c10 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -27,10 +27,9 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.telemetry.HostEvent -import org.opendc.metal.Node -import org.opendc.metal.NodeState import kotlin.math.max /** @@ -38,7 +37,7 @@ import kotlin.math.max */ public class WebExperimentMonitor : ExperimentMonitor { private val logger = KotlinLogging.logger {} - private val currentHostEvent = mutableMapOf<Node, HostEvent>() + private val currentHostEvent = mutableMapOf<Host, HostEvent>() private var startTime = -1L override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { @@ -50,12 +49,8 @@ public class WebExperimentMonitor : ExperimentMonitor { } } - override fun reportHostStateChange( - time: Long, - driver: Host, - host: Node - ) { - logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } + override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { + logger.debug { "Host ${host.uid} changed state $newState [$time]" } val previousEvent = currentHostEvent[host] @@ -84,9 +79,9 @@ public class WebExperimentMonitor : ExperimentMonitor { ) } - private val lastPowerConsumption = mutableMapOf<Node, Double>() + private val lastPowerConsumption = mutableMapOf<Host, Double>() - override fun reportPowerConsumption(host: Node, draw: Double) { + override fun reportPowerConsumption(host: Host, draw: Double) { lastPowerConsumption[host] = draw } @@ -99,7 +94,7 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - host: Node, + host: Host, duration: Long ) { val previousEvent = currentHostEvent[host] @@ -117,7 +112,7 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event @@ -135,7 +130,7 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event @@ -155,7 +150,7 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event @@ -164,7 +159,7 @@ public class WebExperimentMonitor : ExperimentMonitor { } private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap<Node, HostMetrics> = mutableMapOf() + private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf() private fun processHostEvent(event: HostEvent) { val slices = event.duration / SLICE_LENGTH @@ -175,14 +170,14 @@ public class WebExperimentMonitor : ExperimentMonitor { hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)), - hostAggregateMetrics.totalFailureSlices + if (event.node.state != NodeState.ACTIVE) slices.toLong() else 0, - hostAggregateMetrics.totalFailureVmSlices + if (event.node.state != NodeState.ACTIVE) event.vmCount * slices.toLong() else 0 + hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0, + hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0 ) - hostMetrics.compute(event.node) { _, prev -> + hostMetrics.compute(event.host) { _, prev -> HostMetrics( - (event.cpuUsage.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (event.cpuDemand.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), event.vmCount + (prev?.vmCount ?: 0), 1 + (prev?.count ?: 0) ) diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/StateFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/StateFlow.kt deleted file mode 100644 index 996e7700..00000000 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/StateFlow.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.utils.flow - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.ConflatedBroadcastChannel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.flow.asFlow - -/** - * A [Flow] that contains a single value that changes over time. - * - * This class exists to implement the DataFlow/StateFlow functionality that will be implemented in `kotlinx-coroutines` - * in the future, but is not available yet. - * See: https://github.com/Kotlin/kotlinx.coroutines/pull/1354 - */ -public interface StateFlow<T> : Flow<T> { - /** - * The current value of this flow. - * - * Setting a value that is [equal][Any.equals] to the previous one does nothing. - */ - public var value: T -} - -/** - * Creates a [StateFlow] with a given initial [value]. - */ -@Suppress("FunctionName") -public fun <T> StateFlow(value: T): StateFlow<T> = StateFlowImpl(value) - -/** - * Internal implementation of the [StateFlow] interface. - */ -@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) -private class StateFlowImpl<T>(initialValue: T) : StateFlow<T> { - /** - * The [BroadcastChannel] to back this flow. - */ - private val chan = ConflatedBroadcastChannel(initialValue) - - /** - * The internal [Flow] backing this flow. - */ - private val flow = chan.asFlow() - - public override var value: T = initialValue - set(value) { - chan.offer(value) - field = value - } - - @InternalCoroutinesApi - override suspend fun collect(collector: FlowCollector<T>) = flow.collect(collector) -} diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/User.kt b/simulator/opendc-workflow/build.gradle.kts index fc542cef..3cefa409 100644 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/User.kt +++ b/simulator/opendc-workflow/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,14 +20,4 @@ * SOFTWARE. */ -package org.opendc.core - -/** - * A user of the cloud network. - */ -public interface User : Identity { - /** - * The name of the user. - */ - override val name: String -} +description = "Workflow orchestration for OpenDC" diff --git a/simulator/opendc-metal/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-api/build.gradle.kts index 9207de18..d3e67bee 100644 --- a/simulator/opendc-metal/build.gradle.kts +++ b/simulator/opendc-workflow/opendc-workflow-api/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -description = "Bare-metal provisioning in OpenDC" +description = "Workflow orchestration service API for OpenDC" /* Build configuration */ plugins { @@ -29,10 +29,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) - implementation("io.github.microutils:kotlin-logging") } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt index f1cfdf65..5e8b0b9e 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt +++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,8 @@ * SOFTWARE. */ -package org.opendc.workflows.workload +package org.opendc.workflow.api -import org.opendc.core.User -import org.opendc.core.workload.Workload import java.util.* /** @@ -31,17 +29,15 @@ import java.util.* * * @property uid A unique identified of this workflow. * @property name The name of this workflow. - * @property owner The owner of the workflow. * @property tasks The tasks that are part of this workflow. * @property metadata Additional metadata for the job. */ public data class Job( - override val uid: UUID, - override val name: String, - override val owner: User, - public val tasks: Set<Task>, - public val metadata: Map<String, Any> = emptyMap() -) : Workload { + val uid: UUID, + val name: String, + val tasks: Set<Task>, + val metadata: Map<String, Any> = emptyMap() +) { override fun equals(other: Any?): Boolean = other is Job && uid == other.uid override fun hashCode(): Int = uid.hashCode() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt index 4305aa57..db208998 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt +++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package org.opendc.workflows.workload +package org.opendc.workflow.api /** * Meta-data key for the deadline of a task. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt index 4c6d2842..d91f9879 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt +++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,10 +20,8 @@ * SOFTWARE. */ -package org.opendc.workflows.workload +package org.opendc.workflow.api -import org.opendc.compute.api.Image -import org.opendc.core.Identity import java.util.* /** @@ -38,12 +34,11 @@ import java.util.* * @property metadata Additional metadata for this task. */ public data class Task( - override val uid: UUID, - override val name: String, - public val image: Image, - public val dependencies: Set<Task>, - public val metadata: Map<String, Any> = emptyMap() -) : Identity { + val uid: UUID, + val name: String, + val dependencies: Set<Task>, + val metadata: Map<String, Any> = emptyMap() +) { override fun equals(other: Any?): Boolean = other is Task && uid == other.uid override fun hashCode(): Int = uid.hashCode() diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts index b6a2fc45..12a54235 100644 --- a/simulator/opendc-workflows/build.gradle.kts +++ b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -description = "Workflow service for OpenDC" +description = "Workflow orchestration service for OpenDC" /* Build configuration */ plugins { @@ -30,7 +30,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) + api(project(":opendc-workflow:opendc-workflow-api")) api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt index bcf93562..bb2ad6c6 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service import org.opendc.trace.core.Event -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task /** * An event emitted by the [WorkflowService]. diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt new file mode 100644 index 00000000..2f83e376 --- /dev/null +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import kotlinx.coroutines.flow.Flow +import org.opendc.compute.api.ComputeClient +import org.opendc.trace.core.EventTracer +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * A service for cloud workflow management. + * + * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al. + */ +public interface WorkflowService : AutoCloseable { + /** + * The events emitted by the workflow scheduler. + */ + public val events: Flow<WorkflowEvent> + + /** + * Submit the specified [Job] to the workflow service for scheduling. + */ + public suspend fun submit(job: Job) + + /** + * Terminate the lifecycle of the workflow service, stopping all running workflows. + */ + public override fun close() + + public companion object { + /** + * Construct a new [WorkflowService] implementation. + * + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param tracer The event tracer to use. + * @param compute The compute client to use. + * @param mode The scheduling mode to use. + * @param jobAdmissionPolicy The job admission policy to use. + * @param jobOrderPolicy The job order policy to use. + * @param taskEligibilityPolicy The task eligibility policy to use. + * @param taskOrderPolicy The task order policy to use. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + tracer: EventTracer, + compute: ComputeClient, + mode: WorkflowSchedulerMode, + jobAdmissionPolicy: JobAdmissionPolicy, + jobOrderPolicy: JobOrderPolicy, + taskEligibilityPolicy: TaskEligibilityPolicy, + taskOrderPolicy: TaskOrderPolicy + ): WorkflowService { + return WorkflowServiceImpl( + context, + clock, + tracer, + compute, + mode, + jobAdmissionPolicy, + jobOrderPolicy, + taskEligibilityPolicy, + taskOrderPolicy + ) + } + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt index 89849f6a..1bb67169 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal -import org.opendc.workflows.workload.Job +import org.opendc.workflow.api.Job public class JobState(public val job: Job, public val submittedAt: Long) { /** diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt index ef9714c2..c3ce1492 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal import org.opendc.compute.api.Server -import org.opendc.workflows.workload.Task +import org.opendc.workflow.api.Task public class TaskState(public val job: JobState, public val task: Task) { /** @@ -39,12 +39,12 @@ public class TaskState(public val job: JobState, public val task: Task) { /** * The dependencies of this task. */ - public val dependencies: HashSet<TaskState> = HashSet<TaskState>() + public val dependencies: HashSet<TaskState> = HashSet() /** * The dependents of this task. */ - public val dependents: HashSet<TaskState> = HashSet<TaskState>() + public val dependents: HashSet<TaskState> = HashSet() /** * A flag to indicate whether this workflow task instance is a workflow root. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt index 99f5bb87..fe941d09 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal /** * The state of a workflow task. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt index 18721889..29c6aeea 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal -public interface StageWorkflowSchedulerListener { - public fun cycleStarted(scheduler: StageWorkflowService) {} - public fun cycleFinished(scheduler: StageWorkflowService) {} +public interface WorkflowSchedulerListener { + public fun cycleStarted(scheduler: WorkflowServiceImpl) {} + public fun cycleFinished(scheduler: WorkflowServiceImpl) {} public fun jobSubmitted(job: JobState) {} public fun jobStarted(job: JobState) {} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 6b348ed4..85a88acd 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch @@ -32,21 +33,24 @@ import org.opendc.compute.api.* import org.opendc.trace.core.EventTracer import org.opendc.trace.core.consumeAsFlow import org.opendc.trace.core.enable -import org.opendc.workflows.service.stage.job.JobAdmissionPolicy -import org.opendc.workflows.service.stage.job.JobOrderPolicy -import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy -import org.opendc.workflows.service.stage.task.TaskOrderPolicy -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.service.* +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock import java.util.* +import kotlin.coroutines.CoroutineContext /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for * Datacenter Scheduling. */ -public class StageWorkflowService( - internal val coroutineScope: CoroutineScope, +public class WorkflowServiceImpl( + context: CoroutineContext, internal val clock: Clock, internal val tracer: EventTracer, private val computeClient: ComputeClient, @@ -57,6 +61,11 @@ public class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy ) : WorkflowService, ServerWatcher { /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + internal val scope = CoroutineScope(context) + + /** * The logger instance to use. */ private val logger = KotlinLogging.logger {} @@ -99,17 +108,17 @@ public class StageWorkflowService( /** * The root listener of this scheduler. */ - private val rootListener = object : StageWorkflowSchedulerListener { + private val rootListener = object : WorkflowSchedulerListener { /** * The listeners to delegate to. */ - val listeners = mutableSetOf<StageWorkflowSchedulerListener>() + val listeners = mutableSetOf<WorkflowSchedulerListener>() - override fun cycleStarted(scheduler: StageWorkflowService) { + override fun cycleStarted(scheduler: WorkflowServiceImpl) { listeners.forEach { it.cycleStarted(scheduler) } } - override fun cycleFinished(scheduler: StageWorkflowService) { + override fun cycleFinished(scheduler: WorkflowServiceImpl) { listeners.forEach { it.cycleFinished(scheduler) } } @@ -145,6 +154,7 @@ public class StageWorkflowService( private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic + private lateinit var image: Image init { this.mode = mode(this) @@ -152,6 +162,9 @@ public class StageWorkflowService( this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) + scope.launch { + image = computeClient.newImage("workflow-runner") + } } override val events: Flow<WorkflowEvent> = tracer.openRecording().let { @@ -190,6 +203,10 @@ public class StageWorkflowService( requestCycle() } + override fun close() { + scope.cancel() + } + /** * Indicate to the scheduler that a scheduling cycle is needed. */ @@ -214,7 +231,12 @@ public class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - tracer.commit(WorkflowEvent.JobStarted(this, jobInstance.job)) + tracer.commit( + WorkflowEvent.JobStarted( + this, + jobInstance.job + ) + ) rootListener.jobStarted(jobInstance) } @@ -258,16 +280,27 @@ public class StageWorkflowService( val instance = taskQueue.peek() val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1 - val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task - val image = instance.task.image - coroutineScope.launch { - val server = computeClient.newServer(instance.task.name, image, flavor) + val image = image + scope.launch { + val flavor = computeClient.newFlavor( + instance.task.name, + cores, + 1000 + ) // TODO How to determine memory usage for workflow task + val server = computeClient.newServer( + instance.task.name, + image, + flavor, + start = false, + meta = instance.task.metadata + ) instance.state = TaskStatus.ACTIVE instance.server = server taskByServer[server] = instance - server.watch(this@StageWorkflowService) + server.watch(this@WorkflowServiceImpl) + server.start() } activeTasks += instance @@ -278,20 +311,28 @@ public class StageWorkflowService( public override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { - ServerState.ACTIVE -> { + ServerState.PROVISIONING -> { + } + ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() tracer.commit( WorkflowEvent.TaskStarted( - this@StageWorkflowService, + this@WorkflowServiceImpl, task.job.job, task.task ) ) rootListener.taskStarted(task) } - ServerState.SHUTOFF, ServerState.ERROR -> { + ServerState.TERMINATED, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() + + scope.launch { + server.delete() + server.flavor.delete() + } + val job = task.job task.state = TaskStatus.FINISHED task.finishedAt = clock.millis() @@ -299,7 +340,7 @@ public class StageWorkflowService( activeTasks -= task tracer.commit( WorkflowEvent.TaskFinished( - this@StageWorkflowService, + this@WorkflowServiceImpl, task.job.job, task.task ) @@ -322,6 +363,8 @@ public class StageWorkflowService( requestCycle() } + ServerState.DELETED -> { + } else -> throw IllegalStateException() } } @@ -332,11 +375,11 @@ public class StageWorkflowService( rootListener.jobFinished(job) } - public fun addListener(listener: StageWorkflowSchedulerListener) { + public fun addListener(listener: WorkflowSchedulerListener) { rootListener.listeners += listener } - public fun removeListener(listener: StageWorkflowSchedulerListener) { + public fun removeListener(listener: WorkflowSchedulerListener) { rootListener.listeners -= listener } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt index d76579f9..359fc223 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage +package org.opendc.workflow.service.scheduler -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.io.Serializable /** @@ -32,5 +32,5 @@ public interface StagePolicy<T : Any> : Serializable { /** * Build the logic of the stage policy. */ - public operator fun invoke(scheduler: StageWorkflowService): T + public operator fun invoke(scheduler: WorkflowServiceImpl): T } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt index cf8f92e0..58e7893f 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.scheduler import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * The operating mode of a workflow scheduler. @@ -44,9 +44,9 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. */ public object Interactive : WorkflowSchedulerMode() { - override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { override fun requestCycle() { - scheduler.coroutineScope.launch { scheduler.schedule() } + scheduler.scope.launch { scheduler.schedule() } } } @@ -59,14 +59,14 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo public data class Batch(val quantum: Long) : WorkflowSchedulerMode() { private var next: kotlinx.coroutines.Job? = null - override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { override fun requestCycle() { if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. val delay = quantum - (scheduler.clock.millis() % quantum) - val job = scheduler.coroutineScope.launch { + val job = scheduler.scope.launch { delay(delay) next = null scheduler.schedule() @@ -85,12 +85,12 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo public data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() { private var next: kotlinx.coroutines.Job? = null - override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { override fun requestCycle() { if (next == null) { val delay = random.nextInt(200).toLong() - val job = scheduler.coroutineScope.launch { + val job = scheduler.scope.launch { delay(delay) next = null scheduler.schedule() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt index 1190a408..1b5b91b9 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,21 +20,23 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobOrderPolicy] that orders jobs based on its critical path length. */ public data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = - object : Comparator<JobState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = + object : + Comparator<JobState>, + WorkflowSchedulerListener { private val results = HashMap<Job, Long>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt index 0e5a42c0..ed3acff7 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.scheduler.StagePolicy /** * A policy interface for admitting [JobState]s to a scheduling cycle. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt index 83d42b2d..adaa6671 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.scheduler.StagePolicy /** * A policy interface for ordering admitted workflows in the scheduling queue. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt index 6f6ccb50..6a0bfeb9 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobAdmissionPolicy] that limits the amount of active jobs in the system. @@ -31,7 +31,7 @@ import org.opendc.workflows.service.StageWorkflowService * @property limit The maximum number of concurrent jobs in the system. */ public data class LimitJobAdmissionPolicy(public val limit: Int) : JobAdmissionPolicy { - override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { override fun invoke( job: JobState ): JobAdmissionPolicy.Advice = diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt index ac74f090..31f8f8db 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobAdmissionPolicy] that admits all jobs. */ public object NullJobAdmissionPolicy : JobAdmissionPolicy { - override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt index 6c747261..1b359125 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,23 +20,23 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.workload.Job +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.util.* import kotlin.collections.HashMap -import kotlin.collections.getValue -import kotlin.collections.set /** * A [JobOrderPolicy] that randomly orders jobs. */ public object RandomJobOrderPolicy : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = - object : Comparator<JobState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = + object : + Comparator<JobState>, + WorkflowSchedulerListener { private val random = Random(123) private val ids = HashMap<Job, Int>() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt index c1c244c3..6998606d 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [SizeJobOrderPolicy] that orders jobs based on the number of tasks it has. */ public data class SizeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = compareBy { it.tasks.size.let { if (ascending) it else -it } } override fun toString(): String { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt index 005f8153..53d06023 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobOrderPolicy] orders jobs in FIFO order. */ public data class SubmissionTimeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = compareBy { it.submittedAt.let { if (ascending) it else -it } } override fun toString(): String { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt index 6a465746..821d4964 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of active relative tasks (w.r.t. its job) in the system. */ public data class ActiveTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val active = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt index f3f19ef5..42804f5a 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import kotlin.math.max /** @@ -36,8 +36,8 @@ import kotlin.math.max * the average. */ public data class BalancingTaskEligibilityPolicy(public val tolerance: Double = 1.5) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = - object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener { private val active = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt index 0020023f..dae7ad99 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of completed relative tasks. */ public data class CompletionTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val finished = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt index a9f5eb84..7786f6ec 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has. */ public data class DependenciesTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { it.task.dependencies.size.let { if (ascending) it else -it } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt index e5a9f159..4fb835d7 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has. */ public data class DependentsTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { it.dependents.size.let { if (ascending) it else -it } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt index 7ce8ccce..3a634de7 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the average duration of the preceding tasks in the job. */ public data class DurationHistoryTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val results = HashMap<JobState, MutableList<Long>>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt index 3674eb01..d9fde53a 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,15 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.util.* import kotlin.collections.HashMap import kotlin.collections.getValue -import kotlin.collections.minusAssign import kotlin.collections.set /** @@ -37,8 +36,8 @@ import kotlin.collections.set */ public data class DurationTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val results = HashMap<UUID, Long>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt index 2dddbc7c..229460df 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskEligibilityPolicy] that limits the number of active tasks of a job in the system. */ public data class LimitPerJobTaskEligibilityPolicy(public val limit: Int) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = - object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener { private val active = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt index fdc1fd5e..57aa0d58 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskEligibilityPolicy] that limits the total number of active tasks in the system. */ public data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { override fun invoke( task: TaskState ): TaskEligibilityPolicy.Advice = diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt index b40f9823..cfe2aeed 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskEligibilityPolicy] that always allows new tasks to enter. */ public object NullTaskEligibilityPolicy : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = Logic private object Logic : TaskEligibilityPolicy.Logic { override fun invoke( diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt index a0691b23..a01439c2 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,17 +20,17 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.util.* /** * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability]. */ public data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { val random = Random(123) override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt index 890e7165..c12d6a66 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,20 +20,20 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.workload.Task +import org.opendc.workflow.api.Task +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import kotlin.random.Random /** * A [TaskOrderPolicy] that orders the tasks randomly. */ public object RandomTaskOrderPolicy : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val random = Random(123) private val ids = HashMap<Task, Int>() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt index 6b0199b8..e9bbf815 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue. */ public data class SubmissionTimeTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { it.job.submittedAt.let { if (ascending) it else -it } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt index 37597709..ee31aee2 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.scheduler.StagePolicy /** * A policy interface for determining the eligibility of tasks in a scheduling cycle. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt index 5feac6d0..fffcb765 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.scheduler.StagePolicy /** * This interface represents the **T2** stage of the Reference Architecture for Topology Schedulers and provides the diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt index 4207cdfd..2161f5f2 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,10 +20,9 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach @@ -38,23 +35,24 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.simulator.SimHostProvisioner +import org.opendc.compute.simulator.SimHost import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer -import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy -import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy -import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy import kotlin.math.max /** - * Integration test suite for the [StageWorkflowService]. + * Integration test suite for the [WorkflowServiceImpl]. */ -@DisplayName("StageWorkflowService") +@DisplayName("WorkflowServiceImpl") @OptIn(ExperimentalCoroutinesApi::class) internal class StageWorkflowSchedulerIntegrationTest { /** @@ -72,26 +70,27 @@ internal class StageWorkflowSchedulerIntegrationTest { val clock = DelayControllerClockAdapter(testScope) val tracer = EventTracer(clock) - val schedulerAsync = testScope.async { - val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.construct(testScope, clock) } - - val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService] - - // Wait for the bare metal nodes to be spawned - delay(10) + val scheduler = let { + val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) + .use { it.read() } + .map { def -> + SimHost( + def.uid, + def.name, + def.model, + def.meta, + testScope.coroutineContext, + clock, + SimSpaceSharedHypervisorProvider() + ) + } - val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) - val hosts = provisioner.provisionAll() val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) hosts.forEach { compute.addHost(it) } - // Wait for the hypervisors to be spawned - delay(10) - - StageWorkflowService( - testScope, + WorkflowService( + testScope.coroutineContext, clock, tracer, compute.newClient(), @@ -104,7 +103,6 @@ internal class StageWorkflowSchedulerIntegrationTest { } testScope.launch { - val scheduler = schedulerAsync.await() scheduler.events .onEach { event -> when (event) { @@ -119,13 +117,12 @@ internal class StageWorkflowSchedulerIntegrationTest { testScope.launch { val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) - val scheduler = schedulerAsync.await() while (reader.hasNext()) { - val (time, job) = reader.next() + val entry = reader.next() jobsSubmitted++ - delay(max(0, time - clock.millis())) - scheduler.submit(job) + delay(max(0, entry.start - clock.millis())) + scheduler.submit(entry.workload) } } diff --git a/simulator/opendc-workflows/src/test/resources/environment.json b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/environment.json index 0965b250..0965b250 100644 --- a/simulator/opendc-workflows/src/test/resources/environment.json +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/environment.json diff --git a/simulator/opendc-workflows/src/test/resources/log4j2.xml b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml index 70a0eacc..70a0eacc 100644 --- a/simulator/opendc-workflows/src/test/resources/log4j2.xml +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml diff --git a/simulator/opendc-workflows/src/test/resources/trace.gwf b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/trace.gwf index d264b9c3..d264b9c3 100644 --- a/simulator/opendc-workflows/src/test/resources/trace.gwf +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/trace.gwf diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index 7a82adcd..e87dd4d8 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -22,12 +22,11 @@ rootProject.name = "opendc-simulator" include(":opendc-platform") -include(":opendc-core") include(":opendc-compute:opendc-compute-api") include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-simulator") -include(":opendc-metal") -include(":opendc-workflows") +include(":opendc-workflow:opendc-workflow-api") +include(":opendc-workflow:opendc-workflow-service") include(":opendc-format") include(":opendc-experiments:opendc-experiments-sc18") include(":opendc-experiments:opendc-experiments-capelin") |
