diff options
119 files changed, 1534 insertions, 2456 deletions
diff --git a/simulator/README.md b/simulator/README.md index 61ef1d43..d02925d6 100644 --- a/simulator/README.md +++ b/simulator/README.md @@ -23,7 +23,7 @@ This component is responsible for modelling and simulation of datacenters and th - **[opendc-compute](opendc-compute)** The [Infrastructure as a Service](https://en.wikipedia.org/wiki/Infrastructure_as_a_Service) (IaaS) component of OpenDC for computing infrastructure (similar to [Amazon EC2](https://aws.amazon.com/ec2/) and [Google Compute Engine](https://cloud.google.com/compute)). -- **[opendc-workflows](opendc-workflows)** +- **[opendc-workflow](opendc-workflow)** Workflow orchestration service built on top of OpenDC. - **[opendc-format](opendc-format)** Collection of libraries for processing data formats related to (simulation of) cloud computing and datacenters. diff --git a/simulator/opendc-compute/opendc-compute-api/build.gradle.kts b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts index 10046322..835dbbb8 100644 --- a/simulator/opendc-compute/opendc-compute-api/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts @@ -29,5 +29,4 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) } diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt index 025513e6..baa1ba2f 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt @@ -22,21 +22,95 @@ package org.opendc.compute.api +import java.util.UUID + /** * A client interface for the OpenDC Compute service. */ public interface ComputeClient : AutoCloseable { /** + * Obtain the list of [Flavor]s accessible by the requesting user. + */ + public suspend fun queryFlavors(): List<Flavor> + + /** + * Obtain a [Flavor] by its unique identifier. + * + * @param id The identifier of the flavor. + */ + public suspend fun findFlavor(id: UUID): Flavor? + + /** + * Create a new [Flavor] instance at this compute service. + * + * @param name The name of the flavor. + * @param cpuCount The amount of CPU cores for this flavor. + * @param memorySize The size of the memory. + * @param labels The identifying labels of the image. + * @param meta The non-identifying meta-data of the image. + */ + public suspend fun newFlavor( + name: String, + cpuCount: Int, + memorySize: Long, + labels: Map<String, String> = emptyMap(), + meta: Map<String, Any> = emptyMap() + ): Flavor + + /** + * Obtain the list of [Image]s accessible by the requesting user. + */ + public suspend fun queryImages(): List<Image> + + /** + * Obtain a [Image] by its unique identifier. + * + * @param id The identifier of the image. + */ + public suspend fun findImage(id: UUID): Image? + + /** + * Create a new [Image] instance at this compute service. + * + * @param name The name of the image. + * @param labels The identifying labels of the image. + * @param meta The non-identifying meta-data of the image. + */ + public suspend fun newImage( + name: String, + labels: Map<String, String> = emptyMap(), + meta: Map<String, Any> = emptyMap() + ): Image + + /** + * Obtain the list of [Server]s accessible by the requesting user. + */ + public suspend fun queryServers(): List<Server> + + /** + * Obtain a [Server] by its unique identifier. + * + * @param id The identifier of the server. + */ + public suspend fun findServer(id: UUID): Server? + + /** * Create a new [Server] instance at this compute service. * * @param name The name of the server to deploy. * @param image The image to be deployed. * @param flavor The flavor of the machine instance to run this [image] on. + * @param labels The identifying labels of the server. + * @param meta The non-identifying meta-data of the server. + * @param start A flag to indicate that the server should be started immediately. */ public suspend fun newServer( name: String, image: Image, - flavor: Flavor + flavor: Flavor, + labels: Map<String, String> = emptyMap(), + meta: Map<String, Any> = emptyMap(), + start: Boolean = true ): Server /** diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt index bf5f0ce4..5f511f91 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt @@ -26,14 +26,19 @@ package org.opendc.compute.api * Flavors define the compute and memory capacity of [Server] instance. To put it simply, a flavor is an available * hardware configuration for a server. It defines the size of a virtual server that can be launched. */ -public data class Flavor( +public interface Flavor : Resource { /** * The number of (virtual) processing cores to use. */ - public val cpuCount: Int, + public val cpuCount: Int /** * The amount of RAM available to the server (in MB). */ public val memorySize: Long -) + + /** + * Delete the flavor instance. + */ + public suspend fun delete() +} diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt index 280c4d89..83e63b81 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt @@ -22,27 +22,12 @@ package org.opendc.compute.api -import org.opendc.core.resource.Resource -import org.opendc.core.resource.TagContainer -import java.util.* - /** * An image containing a bootable operating system that can directly be executed by physical or virtual server. - * - * OpenStack: A collection of files used to create or rebuild a server. Operators provide a number of pre-built OS - * images by default. You may also create custom images from cloud servers you have launched. These custom images are - * useful for backup purposes or for producing “gold” server images if you plan to deploy a particular server - * configuration frequently. */ -public data class Image( - public override val uid: UUID, - public override val name: String, - public override val tags: TagContainer -) : Resource { - public companion object { - /** - * An empty boot disk [Image] that exits immediately on start. - */ - public val EMPTY: Image = Image(UUID.randomUUID(), "empty", emptyMap()) - } +public interface Image : Resource { + /** + * Delete the image instance. + */ + public suspend fun delete() } diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt index ca98dab0..8fbb7308 100644 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,13 +20,10 @@ * SOFTWARE. */ -package org.opendc.metal - -/* - * Common metadata keys for bare-metal nodes. - */ +package org.opendc.compute.api /** - * The cluster to which the node belongs. + * This exception is thrown to indicate that the compute service does not have enough capacity at the moment to + * fulfill a launch request. */ -public const val NODE_CLUSTER: String = "bare-metal:cluster" +public class InsufficientServerCapacityException(override val cause: Throwable? = null) : Exception("There was insufficient capacity available to satisfy the launch request") diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Resource.kt index 64a47277..08120848 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Resource.kt @@ -22,25 +22,34 @@ package org.opendc.compute.api -import org.opendc.core.User -import org.opendc.core.workload.Workload import java.util.UUID /** - * A workload that represents a VM. - * - * @property uid A unique identified of this VM. - * @property name The name of this VM. - * @property owner The owner of the VM. - * @property image The image of the VM. + * A generic resource provided by the OpenDC Compute service. */ -public data class ComputeWorkload( - override val uid: UUID, - override val name: String, - override val owner: User, - val image: Image -) : Workload { - override fun equals(other: Any?): Boolean = other is ComputeWorkload && uid == other.uid +public interface Resource { + /** + * The unique identifier of the resource. + */ + public val uid: UUID + + /** + * The name of the resource. + */ + public val name: String + + /** + * The identifying labels attached to the resource. + */ + public val labels: Map<String, String> + + /** + * The non-identifying metadata attached to the resource. + */ + public val meta: Map<String, Any> - override fun hashCode(): Int = uid.hashCode() + /** + * Refresh the local state of the resource. + */ + public suspend fun refresh() } diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt index ab1eb860..b508a9f8 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt @@ -22,18 +22,11 @@ package org.opendc.compute.api -import org.opendc.core.resource.Resource - /** * A stateful object representing a server instance that is running on some physical or virtual machine. */ public interface Server : Resource { /** - * The name of the server. - */ - public override val name: String - - /** * The flavor of the server. */ public val flavor: Flavor @@ -44,14 +37,33 @@ public interface Server : Resource { public val image: Image /** - * The tags assigned to the server. + * The last known state of the server. */ - public override val tags: Map<String, String> + public val state: ServerState /** - * The last known state of the server. + * Request the server to be started. + * + * This method is guaranteed to return after the request was acknowledged, but might return before the server was + * started. */ - public val state: ServerState + public suspend fun start() + + /** + * Request the server to be stopped. + * + * This method is guaranteed to return after the request was acknowledged, but might return before the server was + * stopped. + */ + public suspend fun stop() + + /** + * Request the server to be deleted. + * + * This method is guaranteed to return after the request was acknowledged, but might return before the server was + * deleted. + */ + public suspend fun delete() /** * Register the specified [ServerWatcher] to watch the state of the server. @@ -66,9 +78,4 @@ public interface Server : Resource { * @param watcher The watcher to de-register from the server. */ public fun unwatch(watcher: ServerWatcher) - - /** - * Refresh the local state of the resource. - */ - public suspend fun refresh() } diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt index 25d2e519..a4d7d7d7 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt @@ -27,27 +27,27 @@ package org.opendc.compute.api */ public enum class ServerState { /** - * The server has not yet finished the original build process. + * Resources are being allocated for the instance. The instance is not running yet. */ - BUILD, + PROVISIONING, /** - * The server was powered down by the user. + * A user shut down the instance. */ - SHUTOFF, + TERMINATED, /** - * The server is active and running. + * The server instance is booting up or running. */ - ACTIVE, + RUNNING, /** - * The server is in error. + * The server is in an error state. */ ERROR, /** - * The state of the server is unknown. + * The server has been deleted and cannot be started later on. */ - UNKNOWN, + DELETED, } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt index 2cd91144..c3c39572 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt @@ -51,6 +51,11 @@ public interface Host { public val state: HostState /** + * Meta-data associated with the host. + */ + public val meta: Map<String, Any> + + /** * The events emitted by the driver. */ public val events: Flow<HostEvent> @@ -74,7 +79,7 @@ public interface Host { public operator fun contains(server: Server): Boolean /** - * Stat the server [instance][server] if it is currently not running on this host. + * Start the server [instance][server] if it is currently not running on this host. * * @throws IllegalArgumentException if the server is not present on the host. */ @@ -88,9 +93,9 @@ public interface Host { public suspend fun stop(server: Server) /** - * Terminate the specified [instance][server] on this host and cleanup all resources associated with it. + * Delete the specified [instance][server] on this host and cleanup all resources associated with it. */ - public suspend fun terminate(server: Server) + public suspend fun delete(server: Server) /** * Add a [HostListener] to this host. diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt index 6548767e..29f10e27 100644 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt @@ -20,45 +20,43 @@ * SOFTWARE. */ -package org.opendc.metal.service +package org.opendc.compute.service.internal -import org.opendc.compute.api.Image -import org.opendc.core.services.AbstractServiceKey -import org.opendc.metal.Node -import org.opendc.metal.driver.BareMetalDriver +import org.opendc.compute.api.Flavor import java.util.UUID /** - * A cloud platform service for provisioning bare-metal compute nodes on the platform. + * A [Flavor] implementation that is passed to clients but delegates its implementation to another class. */ -public interface ProvisioningService { - /** - * Create a new bare-metal compute node. - */ - public suspend fun create(driver: BareMetalDriver): Node +internal class ClientFlavor(private val delegate: Flavor) : Flavor { + override val uid: UUID = delegate.uid - /** - * Obtain the available nodes. - */ - public suspend fun nodes(): Set<Node> + override var name: String = delegate.name + private set - /** - * Refresh the state of a compute node. - */ - public suspend fun refresh(node: Node): Node + override var cpuCount: Int = delegate.cpuCount + private set - /** - * Deploy the specified [Image] on a compute node. - */ - public suspend fun deploy(node: Node, image: Image): Node + override var memorySize: Long = delegate.memorySize + private set - /** - * Stop the specified [Node] . - */ - public suspend fun stop(node: Node): Node + override var labels: Map<String, String> = delegate.labels.toMap() + private set - /** - * The service key of this service. - */ - public companion object Key : AbstractServiceKey<ProvisioningService>(UUID.randomUUID(), "provisioner") + override var meta: Map<String, Any> = delegate.meta.toMap() + private set + + override suspend fun delete() { + delegate.delete() + } + + override suspend fun refresh() { + delegate.refresh() + + name = delegate.name + cpuCount = delegate.cpuCount + memorySize = delegate.memorySize + labels = delegate.labels + meta = delegate.meta + } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt index b24f80da..6c5b2ab0 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,31 +20,36 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.compute.service.internal -import kotlinx.coroutines.flow.Flow -import org.opendc.core.services.AbstractServiceKey -import org.opendc.workflows.workload.Job +import org.opendc.compute.api.Image import java.util.* /** - * A service for cloud workflow management. - * - * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al. + * An [Image] implementation that is passed to clients but delegates its implementation to another class. */ -public interface WorkflowService { - /** - * The events emitted by the workflow scheduler. - */ - public val events: Flow<WorkflowEvent> - - /** - * Submit the specified [Job] to the workflow service for scheduling. - */ - public suspend fun submit(job: Job) - - /** - * The service key for the workflow scheduler. - */ - public companion object Key : AbstractServiceKey<WorkflowService>(UUID.randomUUID(), "workflows") +internal class ClientImage(private val delegate: Image) : Image { + override val uid: UUID = delegate.uid + + override var name: String = delegate.name + private set + + override var labels: Map<String, String> = delegate.labels.toMap() + private set + + override var meta: Map<String, Any> = delegate.meta.toMap() + private set + + override suspend fun delete() { + delegate.delete() + refresh() + } + + override suspend fun refresh() { + delegate.refresh() + + name = delegate.name + labels = delegate.labels + meta = delegate.meta + } } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt index f84b7435..ae4cee3b 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -46,12 +46,30 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche override var image: Image = delegate.image private set - override var tags: Map<String, String> = delegate.tags.toMap() + override var labels: Map<String, String> = delegate.labels.toMap() + private set + + override var meta: Map<String, Any> = delegate.meta.toMap() private set override var state: ServerState = delegate.state private set + override suspend fun start() { + delegate.start() + refresh() + } + + override suspend fun stop() { + delegate.stop() + refresh() + } + + override suspend fun delete() { + delegate.delete() + refresh() + } + override fun watch(watcher: ServerWatcher) { if (watchers.isEmpty()) { delegate.watch(this) @@ -69,10 +87,13 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche } override suspend fun refresh() { + delegate.refresh() + name = delegate.name flavor = delegate.flavor image = delegate.image - tags = delegate.tags + labels = delegate.labels + meta = delegate.meta state = delegate.state } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 69d6bb59..2c38f7cb 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,11 +22,8 @@ package org.opendc.compute.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.cancel +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService @@ -41,9 +38,7 @@ import org.opendc.utils.TimerScheduler import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* -import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume import kotlin.math.max /** @@ -87,12 +82,27 @@ public class ComputeServiceImpl( /** * The servers that should be launched by the service. */ - private val queue: Deque<LaunchRequest> = ArrayDeque() + private val queue: Deque<SchedulingRequest> = ArrayDeque() /** * The active servers in the system. */ - private val activeServers: MutableSet<Server> = mutableSetOf() + private val activeServers: MutableMap<Server, Host> = mutableMapOf() + + /** + * The registered flavors for this compute service. + */ + internal val flavors = mutableMapOf<UUID, InternalFlavor>() + + /** + * The registered images for this compute service. + */ + internal val images = mutableMapOf<UUID, InternalImage>() + + /** + * The registered servers for this compute service. + */ + private val servers = mutableMapOf<UUID, InternalServer>() public var submittedVms: Int = 0 public var queuedVms: Int = 0 @@ -126,7 +136,74 @@ public class ComputeServiceImpl( override fun newClient(): ComputeClient = object : ComputeClient { private var isClosed: Boolean = false - override suspend fun newServer(name: String, image: Image, flavor: Flavor): Server { + override suspend fun queryFlavors(): List<Flavor> { + check(!isClosed) { "Client is already closed" } + + return flavors.values.map { ClientFlavor(it) } + } + + override suspend fun findFlavor(id: UUID): Flavor? { + check(!isClosed) { "Client is already closed" } + + return flavors[id]?.let { ClientFlavor(it) } + } + + override suspend fun newFlavor( + name: String, + cpuCount: Int, + memorySize: Long, + labels: Map<String, String>, + meta: Map<String, Any> + ): Flavor { + check(!isClosed) { "Client is already closed" } + + val uid = UUID(clock.millis(), random.nextLong()) + val flavor = InternalFlavor( + this@ComputeServiceImpl, + uid, + name, + cpuCount, + memorySize, + labels, + meta + ) + + flavors[uid] = flavor + + return ClientFlavor(flavor) + } + + override suspend fun queryImages(): List<Image> { + check(!isClosed) { "Client is already closed" } + + return images.values.map { ClientImage(it) } + } + + override suspend fun findImage(id: UUID): Image? { + check(!isClosed) { "Client is already closed" } + + return images[id]?.let { ClientImage(it) } + } + + override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image { + check(!isClosed) { "Client is already closed" } + + val uid = UUID(clock.millis(), random.nextLong()) + val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) + + images[uid] = image + + return ClientImage(image) + } + + override suspend fun newServer( + name: String, + image: Image, + flavor: Flavor, + labels: Map<String, String>, + meta: Map<String, Any>, + start: Boolean + ): Server { check(!isClosed) { "Client is closed" } tracer.commit(VmSubmissionEvent(name, image, flavor)) @@ -143,11 +220,36 @@ public class ComputeServiceImpl( ) ) - return suspendCancellableCoroutine { cont -> - val request = LaunchRequest(createServer(name, image, flavor), cont) - queue += request - requestCycle() + val uid = UUID(clock.millis(), random.nextLong()) + val server = InternalServer( + this@ComputeServiceImpl, + uid, + name, + flavor, + image, + labels.toMutableMap(), + meta.toMutableMap() + ) + + servers[uid] = server + + if (start) { + server.start() } + + return ClientServer(server) + } + + override suspend fun findServer(id: UUID): Server? { + check(!isClosed) { "Client is already closed" } + + return servers[id]?.let { ClientServer(it) } + } + + override suspend fun queryServers(): List<Server> { + check(!isClosed) { "Client is already closed" } + + return servers.values.map { ClientServer(it) } } override fun close() { @@ -183,22 +285,31 @@ public class ComputeServiceImpl( scope.cancel() } - private fun createServer( - name: String, - image: Image, - flavor: Flavor - ): Server { - return ServerImpl( - uid = UUID(random.nextLong(), random.nextLong()), - name = name, - flavor = flavor, - image = image - ) + internal fun schedule(server: InternalServer) { + logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } + + queue.add(SchedulingRequest(server)) + requestSchedulingCycle() + } + + internal fun delete(flavor: InternalFlavor) { + checkNotNull(flavors.remove(flavor.uid)) { "Flavor was not known" } } - private fun requestCycle() { - // Bail out in case we have already requested a new cycle. - if (scheduler.isTimerActive(Unit)) { + internal fun delete(image: InternalImage) { + checkNotNull(images.remove(image.uid)) { "Image was not known" } + } + + internal fun delete(server: InternalServer) { + checkNotNull(servers.remove(server.uid)) { "Server was not known" } + } + + /** + * Indicate that a new scheduling cycle is needed due to a change to the service's state. + */ + private fun requestSchedulingCycle() { + // Bail out in case we have already requested a new cycle or the queue is empty. + if (scheduler.isTimerActive(Unit) || queue.isEmpty()) { return } @@ -208,20 +319,28 @@ public class ComputeServiceImpl( val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) scheduler.startSingleTimer(Unit, delay) { - schedule() + doSchedule() } } - private fun schedule() { + /** + * Run a single scheduling iteration. + */ + private fun doSchedule() { while (queue.isNotEmpty()) { - val (server, cont) = queue.peekFirst() - val requiredMemory = server.flavor.memorySize - val selectedHv = allocationLogic.select(availableHosts, server) + val request = queue.peek() - if (selectedHv == null || !selectedHv.host.canFit(server)) { + if (request.isCancelled) { + queue.poll() + continue + } + + val server = request.server + val hv = allocationLogic.select(availableHosts, request.server) + if (hv == null || !hv.host.canFit(server)) { logger.trace { "Server $server selected for scheduling but no capacity available for it." } - if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) { + if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { tracer.commit(VmSubmissionInvalidEvent(server.name)) _events.emit( @@ -247,45 +366,62 @@ public class ComputeServiceImpl( } } - logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.host.uid} ${selectedHv.host.name} ${selectedHv.host.model}" } - queue.poll() - - // Speculatively update the hypervisor view information to prevent other images in the queue from - // deciding on stale values. - selectedHv.numberOfActiveServers++ - selectedHv.provisionedCores += server.flavor.cpuCount - selectedHv.availableMemory -= requiredMemory // XXX Temporary hack + val host = hv.host - scope.launch { - try { - cont.resume(ClientServer(server)) - selectedHv.host.spawn(server) - activeServers += server + // Remove request from queue + queue.poll() - tracer.commit(VmScheduledEvent(server.name)) - _events.emit( - ComputeServiceEvent.MetricsAvailable( - this@ComputeServiceImpl, - hostCount, - availableHosts.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms + logger.info { "Assigned server $server to host $host." } + try { + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + hv.numberOfActiveServers++ + hv.provisionedCores += server.flavor.cpuCount + hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack + + scope.launch { + try { + server.assignHost(host) + host.spawn(server) + activeServers[server] = host + + tracer.commit(VmScheduledEvent(server.name)) + _events.emit( + ComputeServiceEvent.MetricsAvailable( + this@ComputeServiceImpl, + hostCount, + availableHosts.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) ) - ) - } catch (e: Throwable) { - logger.error("Failed to deploy VM", e) + } catch (e: Throwable) { + logger.error("Failed to deploy VM", e) - selectedHv.numberOfActiveServers-- - selectedHv.provisionedCores -= server.flavor.cpuCount - selectedHv.availableMemory += requiredMemory + hv.numberOfActiveServers-- + hv.provisionedCores -= server.flavor.cpuCount + hv.availableMemory += server.flavor.memorySize + } } + } catch (e: Exception) { + logger.warn(e) { "Failed to assign server $server to $host. " } } } } + /** + * A request to schedule an [InternalServer] onto one of the [Host]s. + */ + private data class SchedulingRequest(val server: InternalServer) { + /** + * A flag to indicate that the request is cancelled. + */ + var isCancelled: Boolean = false + } + override fun onStateChanged(host: Host, newState: HostState) { when (newState) { HostState.UP -> { @@ -313,9 +449,7 @@ public class ComputeServiceImpl( ) // Re-schedule on the new machine - if (queue.isNotEmpty()) { - requestCycle() - } + requestSchedulingCycle() } HostState.DOWN -> { logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" } @@ -338,19 +472,23 @@ public class ComputeServiceImpl( ) ) - if (queue.isNotEmpty()) { - requestCycle() - } + requestSchedulingCycle() } } } override fun onStateChanged(host: Host, server: Server, newState: ServerState) { - val serverImpl = server as ServerImpl - serverImpl.state = newState - serverImpl.watchers.forEach { it.onStateChanged(server, newState) } + require(server is InternalServer) { "Invalid server type passed to service" } - if (newState == ServerState.SHUTOFF) { + if (server.host != host) { + // This can happen when a server is rescheduled and started on another machine, while being deleted from + // the old machine. + return + } + + server.state = newState + + if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) { logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } tracer.commit(VmStoppedEvent(server.name)) @@ -379,36 +517,7 @@ public class ComputeServiceImpl( } // Try to reschedule if needed - if (queue.isNotEmpty()) { - requestCycle() - } + requestSchedulingCycle() } } - - public data class LaunchRequest(val server: Server, val cont: Continuation<Server>) - - private class ServerImpl( - override val uid: UUID, - override val name: String, - override val flavor: Flavor, - override val image: Image - ) : Server { - val watchers = mutableListOf<ServerWatcher>() - - override fun watch(watcher: ServerWatcher) { - watchers += watcher - } - - override fun unwatch(watcher: ServerWatcher) { - watchers -= watcher - } - - override suspend fun refresh() { - // No-op: this object is the source-of-truth - } - - override val tags: Map<String, String> = emptyMap() - - override var state: ServerState = ServerState.BUILD - } } diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt index 1c5c7a8d..95e280df 100644 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt @@ -20,53 +20,45 @@ * SOFTWARE. */ -package org.opendc.metal +package org.opendc.compute.service.internal -import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.core.Identity -import java.util.UUID +import java.util.* /** - * A bare-metal compute node. + * Internal stateful representation of a [Flavor]. */ -public data class Node( - /** - * The unique identifier of the node. - */ - public override val uid: UUID, +internal class InternalFlavor( + private val service: ComputeServiceImpl, + override val uid: UUID, + name: String, + cpuCount: Int, + memorySize: Long, + labels: Map<String, String>, + meta: Map<String, Any> +) : Flavor { + override var name: String = name + private set - /** - * The optional name of the node. - */ - public override val name: String, + override var cpuCount: Int = cpuCount + private set - /** - * Metadata of the node. - */ - public val metadata: Map<String, Any>, + override var memorySize: Long = memorySize + private set - /** - * The last known state of the compute node. - */ - public val state: NodeState, + override val labels: MutableMap<String, String> = labels.toMutableMap() - /** - * The flavor of the node. - */ - public val flavor: Flavor, + override val meta: MutableMap<String, Any> = meta.toMutableMap() - /** - * The boot image of the node. - */ - public val image: Image, + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override suspend fun delete() { + service.delete(this) + } + + override fun equals(other: Any?): Boolean = other is InternalFlavor && uid == other.uid - /** - * The events that are emitted by the node. - */ - public val events: Flow<NodeEvent> -) : Identity { override fun hashCode(): Int = uid.hashCode() - override fun equals(other: Any?): Boolean = other is Node && uid == other.uid } diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceKey.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt index 9078ecdd..86f2f6b9 100644 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceKey.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,26 +20,35 @@ * SOFTWARE. */ -package org.opendc.core.services +package org.opendc.compute.service.internal -import org.opendc.core.Identity +import org.opendc.compute.api.Image import java.util.* /** - * An interface for identifying service implementations of the same type (providing the same service). - * - * @param T The shape of the messages the service responds to. + * Internal stateful representation of an [Image]. */ -public interface ServiceKey<T : Any> : Identity +internal class InternalImage( + private val service: ComputeServiceImpl, + override val uid: UUID, + override val name: String, + labels: Map<String, String>, + meta: Map<String, Any> +) : Image { + + override val labels: MutableMap<String, String> = labels.toMutableMap() + + override val meta: MutableMap<String, Any> = meta.toMutableMap() + + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override suspend fun delete() { + service.delete(this) + } + + override fun equals(other: Any?): Boolean = other is InternalImage && uid == other.uid -/** - * Helper class for constructing a [ServiceKey]. - * - * @property uid The unique identifier of the service. - * @property name The name of the service. - */ -public abstract class AbstractServiceKey<T : Any>(override val uid: UUID, override val name: String) : ServiceKey<T> { - override fun equals(other: Any?): Boolean = other is ServiceKey<*> && uid == other.uid override fun hashCode(): Int = uid.hashCode() - override fun toString(): String = "ServiceKey[uid=$uid, name=$name]" } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt new file mode 100644 index 00000000..ff7c1d15 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.internal + +import mu.KotlinLogging +import org.opendc.compute.api.* +import org.opendc.compute.service.driver.Host +import java.util.UUID + +/** + * Internal implementation of the [Server] interface. + */ +internal class InternalServer( + private val service: ComputeServiceImpl, + override val uid: UUID, + override val name: String, + override val flavor: Flavor, + override val image: Image, + override val labels: MutableMap<String, String>, + override val meta: MutableMap<String, Any> +) : Server { + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The watchers of this server object. + */ + private val watchers = mutableListOf<ServerWatcher>() + + /** + * The [Host] that has been assigned to host the server. + */ + internal var host: Host? = null + + override suspend fun start() { + when (state) { + ServerState.RUNNING -> { + logger.debug { "User tried to start server but server is already running" } + return + } + ServerState.PROVISIONING -> { + logger.debug { "User tried to start server but request is already pending: doing nothing" } + return + } + ServerState.DELETED -> { + logger.warn { "User tried to start terminated server" } + throw IllegalArgumentException("Server is terminated") + } + else -> { + logger.info { "User requested to start server $uid" } + state = ServerState.PROVISIONING + service.schedule(this) + } + } + } + + override suspend fun stop() { + when (state) { + ServerState.PROVISIONING -> {} // TODO Find way to interrupt these + ServerState.RUNNING, ServerState.ERROR -> { + val host = checkNotNull(host) { "Server not running" } + host.stop(this) + } + ServerState.TERMINATED -> {} // No work needed + ServerState.DELETED -> throw IllegalStateException("Server is terminated") + } + } + + override suspend fun delete() { + when (state) { + ServerState.PROVISIONING -> {} // TODO Find way to interrupt these + ServerState.RUNNING -> { + val host = checkNotNull(host) { "Server not running" } + host.delete(this) + service.delete(this) + } + else -> {} // No work needed + } + } + + override fun watch(watcher: ServerWatcher) { + watchers += watcher + } + + override fun unwatch(watcher: ServerWatcher) { + watchers -= watcher + } + + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override var state: ServerState = ServerState.TERMINATED + set(value) { + if (value != field) { + watchers.forEach { it.onStateChanged(this, value) } + } + + field = value + } + + internal fun assignHost(host: Host) { + this.host = host + } + + override fun equals(other: Any?): Boolean = other is InternalServer && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt index 3facb182..ac7b351d 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt @@ -38,7 +38,7 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al ): HostView? { return hypervisors.asIterable() .filter { hv -> - val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long) + val fitsMemory = hv.availableMemory >= (server.image.meta["required-memory"] as Long) val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts index d7d5f002..31fcda2f 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -31,7 +31,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-compute:opendc-compute-service")) - api(project(":opendc-metal")) api(project(":opendc-simulator:opendc-simulator-compute")) api(project(":opendc-simulator:opendc-simulator-failures")) implementation(project(":opendc-utils")) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt deleted file mode 100644 index 2405a8f9..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Image -import org.opendc.compute.simulator.power.api.CpuPowerModel -import org.opendc.compute.simulator.power.api.Powerable -import org.opendc.compute.simulator.power.models.ConstantPowerModel -import org.opendc.metal.Node -import org.opendc.metal.NodeEvent -import org.opendc.metal.NodeState -import org.opendc.metal.driver.BareMetalDriver -import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.failures.FailureDomain -import org.opendc.utils.flow.EventFlow -import org.opendc.utils.flow.StateFlow -import java.time.Clock -import java.util.UUID - -/** - * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. - * - * @param coroutineScope The [CoroutineScope] the driver runs in. - * @param clock The virtual clock to keep track of time. - * @param uid The unique identifier of the machine. - * @param name An optional name of the machine. - * @param metadata The initial metadata of the node. - * @param machine The machine model to simulate. - * @param cpuPowerModel The CPU power model of this machine. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public class SimBareMetalDriver( - private val coroutineScope: CoroutineScope, - private val clock: Clock, - uid: UUID, - name: String, - metadata: Map<String, Any>, - machine: SimMachineModel, - cpuPowerModel: CpuPowerModel = ConstantPowerModel(0.0), -) : BareMetalDriver, FailureDomain, Powerable { - /** - * The flavor that corresponds to this machine. - */ - private val flavor = Flavor( - machine.cpus.size, - machine.memory.map { it.size }.sum() - ) - - /** - * The events of the machine. - */ - private val events = EventFlow<NodeEvent>() - - /** - * The machine state. - */ - private val nodeState = - StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, flavor, Image.EMPTY, events)) - - /** - * The [SimBareMetalMachine] we use to run the workload. - */ - private val machine = SimBareMetalMachine(coroutineScope, clock, machine) - - override val node: Flow<Node> = nodeState - - override val usage: Flow<Double> - get() = this.machine.usage - - override val powerDraw: Flow<Double> = cpuPowerModel.getPowerDraw(this) - - /** - * The [Job] that runs the simulated workload. - */ - private var job: Job? = null - - override suspend fun init(): Node { - return nodeState.value - } - - override suspend fun start(): Node { - val node = nodeState.value - if (node.state != NodeState.SHUTOFF) { - return node - } - - val workload = node.image.tags["workload"] as SimWorkload - - job = coroutineScope.launch { - delay(1) // TODO Introduce boot time - initMachine() - try { - machine.run(workload, mapOf("driver" to this@SimBareMetalDriver, "node" to node)) - exitMachine(null) - } catch (_: CancellationException) { - // Ignored - } catch (cause: Throwable) { - exitMachine(cause) - } - } - - setNode(node.copy(state = NodeState.BOOT)) - return nodeState.value - } - - private fun initMachine() { - setNode(nodeState.value.copy(state = NodeState.ACTIVE)) - } - - private fun exitMachine(cause: Throwable?) { - val newNodeState = - if (cause == null) - NodeState.SHUTOFF - else - NodeState.ERROR - setNode(nodeState.value.copy(state = newNodeState)) - } - - override suspend fun stop(): Node { - val node = nodeState.value - if (node.state == NodeState.SHUTOFF) { - return node - } - - job?.cancelAndJoin() - setNode(node.copy(state = NodeState.SHUTOFF)) - return node - } - - override suspend fun reboot(): Node { - stop() - return start() - } - - override suspend fun setImage(image: Image): Node { - setNode(nodeState.value.copy(image = image)) - return nodeState.value - } - - override suspend fun refresh(): Node = nodeState.value - - private fun setNode(value: Node) { - val field = nodeState.value - if (field.state != value.state) { - events.emit(NodeEvent.StateChanged(value, field.state)) - } - - nodeState.value = value - } - - override val scope: CoroutineScope - get() = coroutineScope - - override suspend fun fail() { - setNode(nodeState.value.copy(state = NodeState.ERROR)) - } - - override suspend fun recover() { - setNode(nodeState.value.copy(state = NodeState.ACTIVE)) - } - - override fun toString(): String = "SimBareMetalDriver(node = ${nodeState.value.uid})" -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index fd547d3d..19fa3e97 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -29,34 +29,43 @@ import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* -import org.opendc.metal.Node +import org.opendc.compute.simulator.power.api.CpuPowerModel +import org.opendc.compute.simulator.power.api.Powerable +import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.simulator.compute.* import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.failures.FailureDomain import org.opendc.utils.flow.EventFlow +import java.time.Clock import java.util.* +import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume /** * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. */ public class SimHost( - public val node: Node, - private val coroutineScope: CoroutineScope, - hypervisor: SimHypervisorProvider -) : Host, SimWorkload { + override val uid: UUID, + override val name: String, + model: SimMachineModel, + override val meta: Map<String, Any>, + context: CoroutineContext, + clock: Clock, + hypervisor: SimHypervisorProvider, + cpuPowerModel: CpuPowerModel = ConstantPowerModel(0.0), + private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper() +) : Host, FailureDomain, Powerable, AutoCloseable { /** - * The logger instance of this server. + * The [CoroutineScope] of the host bounded by the lifecycle of the host. */ - private val logger = KotlinLogging.logger {} + override val scope: CoroutineScope = CoroutineScope(context) /** - * The execution context in which the [Host] runs. + * The logger instance of this server. */ - private lateinit var ctx: SimExecutionContext + private val logger = KotlinLogging.logger {} override val events: Flow<HostEvent> get() = _events @@ -70,12 +79,17 @@ public class SimHost( /** * Current total memory use of the images on this hypervisor. */ - private var availableMemory: Long = 0 + private var availableMemory: Long = model.memory.map { it.size }.sum() + + /** + * The machine to run on. + */ + public val machine: SimBareMetalMachine = SimBareMetalMachine(scope, clock, model) /** * The hypervisor to run multiple workloads. */ - private val hypervisor = hypervisor.create( + public val hypervisor: SimHypervisor = hypervisor.create( object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, @@ -107,26 +121,40 @@ public class SimHost( */ private val guests = HashMap<Server, Guest>() - override val uid: UUID - get() = node.uid - - override val name: String - get() = node.name - - override val model: HostModel - get() = HostModel(node.flavor.cpuCount, node.flavor.memorySize) - override val state: HostState get() = _state - private var _state: HostState = HostState.UP + private var _state: HostState = HostState.DOWN set(value) { - listeners.forEach { it.onStateChanged(this, value) } + if (value != field) { + listeners.forEach { it.onStateChanged(this, value) } + } field = value } + override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum()) + + override val powerDraw: Flow<Double> = cpuPowerModel.getPowerDraw(this) + + init { + // Launch hypervisor onto machine + scope.launch { + try { + _state = HostState.UP + machine.run(this@SimHost.hypervisor, emptyMap()) + } catch (_: CancellationException) { + // Ignored + } catch (cause: Throwable) { + logger.error(cause) { "Host failed" } + throw cause + } finally { + _state = HostState.DOWN + } + } + } + override fun canFit(server: Server): Boolean { val sufficientMemory = availableMemory > server.flavor.memorySize - val enoughCpus = ctx.machine.cpus.size >= server.flavor.cpuCount + val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) return sufficientMemory && enoughCpus && canFit @@ -146,7 +174,7 @@ public class SimHost( guest.start() } - _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) } override fun contains(server: Server): Boolean { @@ -163,7 +191,7 @@ public class SimHost( guest.stop() } - override suspend fun terminate(server: Server) { + override suspend fun delete(server: Server) { val guest = guests.remove(server) ?: return guest.terminate() } @@ -176,11 +204,16 @@ public class SimHost( listeners.remove(listener) } + override fun close() { + scope.cancel() + _state = HostState.DOWN + } + /** * Convert flavor to machine model. */ private fun Flavor.toMachineModel(): SimMachineModel { - val originalCpu = ctx.machine.cpus[0] + val originalCpu = machine.model.cpus[0] val processingNode = originalCpu.node.copy(coreCount = cpuCount) val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) @@ -190,7 +223,7 @@ public class SimHost( private fun onGuestStart(vm: Guest) { guests.forEach { _, guest -> - if (guest.state == ServerState.ACTIVE) { + if (guest.state == ServerState.RUNNING) { vm.performanceInterferenceModel?.onStart(vm.server.image.name) } } @@ -200,58 +233,71 @@ public class SimHost( private fun onGuestStop(vm: Guest) { guests.forEach { _, guest -> - if (guest.state == ServerState.ACTIVE) { + if (guest.state == ServerState.RUNNING) { vm.performanceInterferenceModel?.onStop(vm.server.image.name) } } listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) + } + + override suspend fun fail() { + _state = HostState.DOWN + } + + override suspend fun recover() { + _state = HostState.UP } /** * A virtual machine instance that the driver manages. */ private inner class Guest(val server: Server, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - var state: ServerState = ServerState.SHUTOFF + var state: ServerState = ServerState.TERMINATED suspend fun start() { when (state) { - ServerState.SHUTOFF -> { + ServerState.TERMINATED -> { logger.info { "User requested to start server ${server.uid}" } launch() } - ServerState.ACTIVE -> return + ServerState.RUNNING -> return + ServerState.DELETED -> { + logger.warn { "User tried to start terminated server" } + throw IllegalArgumentException("Server is terminated") + } else -> assert(false) { "Invalid state transition" } } } suspend fun stop() { when (state) { - ServerState.ACTIVE, ServerState.ERROR -> { + ServerState.RUNNING, ServerState.ERROR -> { val job = job ?: throw IllegalStateException("Server should be active") job.cancel() job.join() } - ServerState.SHUTOFF -> return + ServerState.TERMINATED, ServerState.DELETED -> return else -> assert(false) { "Invalid state transition" } } } suspend fun terminate() { stop() + state = ServerState.DELETED } private var job: Job? = null private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> assert(job == null) { "Concurrent job running" } - val workload = server.image.tags["workload"] as SimWorkload + val workload = mapper.createWorkload(server) - val job = coroutineScope.launch { + val job = scope.launch { delay(1) // TODO Introduce boot time init() cont.resume(Unit) @@ -271,14 +317,14 @@ public class SimHost( } private fun init() { - state = ServerState.ACTIVE + state = ServerState.RUNNING onGuestStart(this) } private fun exit(cause: Throwable?) { state = if (cause == null) - ServerState.SHUTOFF + ServerState.TERMINATED else ServerState.ERROR @@ -286,18 +332,4 @@ public class SimHost( onGuestStop(this) } } - - override fun onStart(ctx: SimExecutionContext) { - this.ctx = ctx - this.availableMemory = ctx.machine.memory.map { it.size }.sum() - this.hypervisor.onStart(ctx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return hypervisor.onStart(ctx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return hypervisor.onNext(ctx, cpu, remainingWork) - } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt deleted file mode 100644 index bb03777b..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.* -import org.opendc.compute.api.Image -import org.opendc.compute.service.driver.Host -import org.opendc.metal.Node -import org.opendc.metal.service.ProvisioningService -import org.opendc.simulator.compute.SimHypervisorProvider -import kotlin.coroutines.CoroutineContext - -/** - * A helper class to provision [SimHost]s on top of bare-metal machines using the [ProvisioningService]. - * - * @param context The [CoroutineContext] to use. - * @param metal The [ProvisioningService] to use. - * @param hypervisor The type of hypervisor to use. - */ -public class SimHostProvisioner( - private val context: CoroutineContext, - private val metal: ProvisioningService, - private val hypervisor: SimHypervisorProvider -) : AutoCloseable { - /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context) - - /** - * Provision all machines with a host. - */ - public suspend fun provisionAll(): List<Host> = coroutineScope { - metal.nodes().map { node -> async { provision(node) } }.awaitAll() - } - - /** - * Provision the specified [Node]. - */ - public suspend fun provision(node: Node): Host = coroutineScope { - val host = SimHost(node, scope, hypervisor) - metal.deploy(node, Image(node.uid, node.name, mapOf("workload" to host))) - host - } - - override fun close() { - scope.cancel() - } -} diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt index 30ce423c..c05f1a2c 100644 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt @@ -20,22 +20,16 @@ * SOFTWARE. */ -package org.opendc.metal +package org.opendc.compute.simulator + +import org.opendc.compute.api.Server +import org.opendc.simulator.compute.workload.SimWorkload /** - * An event that is emitted by a [Node]. + * A [SimWorkloadMapper] that maps a [Server] to a workload via the meta-data. */ -public sealed class NodeEvent { - /** - * The node that emitted the event. - */ - public abstract val node: Node - - /** - * This event is emitted when the state of [node] changes. - * - * @property node The node of which the state changed. - * @property previousState The previous state of the node. - */ - public data class StateChanged(override val node: Node, val previousState: NodeState) : NodeEvent() +public class SimMetaWorkloadMapper(private val key: String = "workload") : SimWorkloadMapper { + override fun createWorkload(server: Server): SimWorkload { + return requireNotNull(server.meta[key] ?: server.image.meta[key]) as SimWorkload + } } diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/Resource.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt index 5bb2c2ce..7082c5cf 100644 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/Resource.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,17 @@ * SOFTWARE. */ -package org.opendc.core.resource +package org.opendc.compute.simulator -import org.opendc.core.Identity +import org.opendc.compute.api.Server +import org.opendc.simulator.compute.workload.SimWorkload /** - * Represents a generic cloud resource. + * A [SimWorkloadMapper] is responsible for mapping a [Server] and [Image] to a [SimWorkload] that can be simulated. */ -public interface Resource : Identity { +public fun interface SimWorkloadMapper { /** - * The tags of this cloud resource. + * Map the specified [server] to a [SimWorkload] that can be simulated. */ - public val tags: TagContainer + public fun createWorkload(server: Server): SimWorkload } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt index 0141bc8c..604b69c0 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt @@ -2,7 +2,7 @@ package org.opendc.compute.simulator.power.api import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map -import org.opendc.metal.driver.BareMetalDriver +import org.opendc.compute.simulator.SimHost public interface CpuPowerModel { /** @@ -18,14 +18,14 @@ public interface CpuPowerModel { /** * Emits the values of power consumption for servers. * - * @param driver A [BareMetalDriver] that offers host CPU utilization. + * @param host A [SimHost] that offers host CPU utilization. * @param withoutIdle A [Boolean] flag indicates whether (false) add a constant * power consumption value when the server is idle or (true) not * with a default value being false. * @return A [Flow] of values representing the server power draw. */ - public fun getPowerDraw(driver: BareMetalDriver, withoutIdle: Boolean = false): Flow<Double> = - driver.usage.map { + public fun getPowerDraw(host: SimHost, withoutIdle: Boolean = false): Flow<Double> = + host.machine.usage.map { computeCpuPower(it) } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt deleted file mode 100644 index 0d90376e..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.withContext -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.Image -import org.opendc.metal.NodeEvent -import org.opendc.metal.NodeState -import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.util.UUID - -@OptIn(ExperimentalCoroutinesApi::class) -internal class SimBareMetalDriverTest { - private lateinit var machineModel: SimMachineModel - - @BeforeEach - fun setUp() { - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - - machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 2000.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } - ) - } - - @Test - fun testFlopsWorkload() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - - var finalState: NodeState = NodeState.UNKNOWN - var finalTime = 0L - - testScope.launch { - val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) - val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to SimFlopsWorkload(4_000, utilization = 1.0))) - // Batch driver commands - withContext(coroutineContext) { - driver.init() - driver.setImage(image) - val node = driver.start() - node.events.collect { event -> - when (event) { - is NodeEvent.StateChanged -> { - finalState = event.node.state - finalTime = clock.millis() - } - } - } - } - } - - testScope.advanceUntilIdle() - assertAll( - { assertEquals(NodeState.SHUTOFF, finalState) }, - { assertEquals(501, finalTime) } - ) - } -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 61bff39f..e1a1d87e 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,7 +24,6 @@ package org.opendc.compute.simulator import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -39,8 +38,6 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.api.ServerWatcher import org.opendc.compute.service.driver.HostEvent -import org.opendc.metal.Node -import org.opendc.metal.NodeState import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -82,18 +79,13 @@ internal class SimHostTest { var grantedWork = 0L var overcommittedWork = 0L - val node = Node( - UUID.randomUUID(), "name", emptyMap(), NodeState.SHUTOFF, - Flavor(machineModel.cpus.size, machineModel.memory.map { it.size }.sum()), Image.EMPTY, emptyFlow() - ) - scope.launch { - val virtDriver = SimHost(node, this, SimFairShareHypervisorProvider()) - val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver)) + val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider()) val duration = 5 * 60L - val vmImageA = Image( + val vmImageA = MockImage( UUID.randomUUID(), "<unnamed>", + emptyMap(), mapOf( "workload" to SimTraceWorkload( sequenceOf( @@ -105,9 +97,10 @@ internal class SimHostTest { ) ) ) - val vmImageB = Image( + val vmImageB = MockImage( UUID.randomUUID(), "<unnamed>", + emptyMap(), mapOf( "workload" to SimTraceWorkload( sequenceOf( @@ -120,16 +113,9 @@ internal class SimHostTest { ) ) - val metalDriver = - SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) - - metalDriver.init() - metalDriver.setImage(vmm) - metalDriver.start() - delay(5) - val flavor = Flavor(2, 0) + val flavor = MockFlavor(2, 0) virtDriver.events .onEach { event -> when (event) { @@ -157,14 +143,56 @@ internal class SimHostTest { ) } + private class MockFlavor( + override val cpuCount: Int, + override val memorySize: Long + ) : Flavor { + override val uid: UUID = UUID.randomUUID() + override val name: String = "test" + override val labels: Map<String, String> = emptyMap() + override val meta: Map<String, Any> = emptyMap() + + override suspend fun delete() { + throw NotImplementedError() + } + + override suspend fun refresh() { + throw NotImplementedError() + } + } + + private class MockImage( + override val uid: UUID, + override val name: String, + override val labels: Map<String, String>, + override val meta: Map<String, Any> + ) : Image { + override suspend fun delete() { + throw NotImplementedError() + } + + override suspend fun refresh() { + throw NotImplementedError() + } + } + private class MockServer( override val uid: UUID, override val name: String, override val flavor: Flavor, override val image: Image ) : Server { - override val tags: Map<String, String> = emptyMap() - override val state: ServerState = ServerState.BUILD + override val labels: Map<String, String> = emptyMap() + + override val meta: Map<String, Any> = emptyMap() + + override val state: ServerState = ServerState.TERMINATED + + override suspend fun start() {} + + override suspend fun stop() {} + + override suspend fun delete() {} override fun watch(watcher: ServerWatcher) {} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt deleted file mode 100644 index 33b3db94..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.opendc.compute.api.Image -import org.opendc.metal.service.SimpleProvisioningService -import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.util.UUID - -/** - * Test suite for the [SimpleProvisioningService]. - */ -@OptIn(ExperimentalCoroutinesApi::class) -internal class SimProvisioningServiceTest { - private lateinit var machineModel: SimMachineModel - - @BeforeEach - fun setUp() { - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - - machineModel = SimMachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 2000.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } - ) - } - - /** - * A basic smoke test. - */ - @Test - fun testSmoke() { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - - testScope.launch { - val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("machine" to SimFlopsWorkload(1000))) - val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel) - - val provisioner = SimpleProvisioningService() - provisioner.create(driver) - delay(5) - val nodes = provisioner.nodes() - val node = provisioner.deploy(nodes.first(), image) - node.events.collect { println(it) } - } - - testScope.advanceUntilIdle() - } -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt index d4d88fb1..9d034a5d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt @@ -1,18 +1,21 @@ package org.opendc.compute.simulator.power import io.mockk.* +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.* -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource +import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.power.api.CpuPowerModel import org.opendc.compute.simulator.power.models.* -import org.opendc.metal.driver.BareMetalDriver +import org.opendc.simulator.compute.SimBareMetalMachine import java.util.stream.Stream import kotlin.math.pow +@OptIn(ExperimentalCoroutinesApi::class) internal class CpuPowerModelTest { private val epsilon = 10.0.pow(-3) private val cpuUtil = .9 @@ -44,21 +47,24 @@ internal class CpuPowerModelTest { powerModel: CpuPowerModel, expectedPowerConsumption: Double ) { - val cpuLoads = flowOf(cpuUtil, cpuUtil, cpuUtil) - val bareMetalDriver = mockkClass(BareMetalDriver::class) - every { bareMetalDriver.usage } returns cpuLoads + runBlockingTest { + val cpuLoads = flowOf(cpuUtil, cpuUtil, cpuUtil).stateIn(this) + val bareMetalDriver = mockkClass(SimHost::class) + val machine = mockkClass(SimBareMetalMachine::class) + every { bareMetalDriver.machine } returns machine + every { machine.usage } returns cpuLoads - runBlocking { val serverPowerDraw = powerModel.getPowerDraw(bareMetalDriver) - assertEquals(serverPowerDraw.count(), cpuLoads.count()) assertEquals( serverPowerDraw.first().toDouble(), flowOf(expectedPowerConsumption).first().toDouble(), epsilon ) + + verify(exactly = 1) { bareMetalDriver.machine } + verify(exactly = 1) { machine.usage } } - verify(exactly = 1) { bareMetalDriver.usage } } @Suppress("unused") diff --git a/simulator/opendc-core/build.gradle.kts b/simulator/opendc-core/build.gradle.kts deleted file mode 100644 index 7e1a4b97..00000000 --- a/simulator/opendc-core/build.gradle.kts +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2017 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Base model for datacenter simulation" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(platform(project(":opendc-platform"))) - api("org.jetbrains.kotlinx:kotlinx-coroutines-core") -} diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Environment.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/Environment.kt deleted file mode 100644 index a5055cff..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Environment.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core - -/** - * A description of a large-scale computing environment. This description includes including key size and topology - * information of the environment, types of resources, but also various operational and management rules such as - * scheduled maintenance, allocation and other constraints. - * - * @property name The name of the environment. - * @property description A small textual description about the environment that is being modeled. - * @property platforms The cloud platforms (such as AWS or GCE) in this environment. - */ -public data class Environment(val name: String, val description: String?, val platforms: List<Platform>) diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Platform.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/Platform.kt deleted file mode 100644 index 5550ffed..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Platform.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core - -import java.util.* - -/** - * A representation of a cloud platform such as Amazon Web Services (AWS), Microsoft Azure or Google Cloud. - * - * @property uid The unique identifier of this topology. - * @property name the name of the platform. - * @property zones The availability zones available on this platform. - */ -public data class Platform(override val uid: UUID, override val name: String, val zones: List<Zone>) : Identity diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Zone.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/Zone.kt deleted file mode 100644 index 834f6cf2..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Zone.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core - -import org.opendc.core.services.ServiceRegistry -import java.util.* - -/** - * An isolated location within a topology region from which public cloud services operate, roughly equivalent to a - * single topology. Zones contain one or more clusters and secondary storage. - * - * This class models *only* the static information of a zone, with dynamic information being contained within the zone's - * actor. During runtime, it's actor acts as a registry for all the cloud services provided by the zone. - * - * @property uid The unique identifier of this availability zone. - * @property name The name of the zone within its platform. - * @property services The service registry containing the services of the zone. - */ -public data class Zone( - override val uid: UUID, - override val name: String, - val services: ServiceRegistry -) : Identity { - override fun equals(other: Any?): Boolean = other is Zone && uid == other.uid - override fun hashCode(): Int = uid.hashCode() -} diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/TagContainer.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/TagContainer.kt deleted file mode 100644 index 6a4ff102..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/TagContainer.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core.resource - -/** - * An immutable map containing the tags of some resource. - */ -public typealias TagContainer = Map<String, Any> - -/** - * Obtain the value of the tag with the specified [key] of type [T]. If the tag does not exist or the tag is of - * different type, `null` is returned. - */ -public inline fun <reified T : Any> TagContainer.typed(key: String): T? = this[key] as? T diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistry.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistry.kt deleted file mode 100644 index 7434d91c..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistry.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core.services - -/** - * An immutable service registry interface. - */ -public interface ServiceRegistry { - /** - * The keys in this registry. - */ - public val keys: Collection<ServiceKey<*>> - - /** - * Determine if this map contains the service with the specified [ServiceKey]. - * - * @param key The key of the service to check for. - * @return `true` if the service is in the map, `false` otherwise. - */ - public operator fun contains(key: ServiceKey<*>): Boolean - - /** - * Obtain the service with the specified [ServiceKey]. - * - * @param key The key of the service to obtain. - * @return The references to the service. - * @throws IllegalArgumentException if the key does not exist in the map. - */ - public operator fun <T : Any> get(key: ServiceKey<T>): T - - /** - * Return the result of associating the specified [service] with the given [key] in this registry. - */ - public fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry -} - -/** - * Construct an empty [ServiceRegistry]. - */ -@Suppress("FunctionName") -public fun ServiceRegistry(): ServiceRegistry = ServiceRegistryImpl(emptyMap()) diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistryImpl.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistryImpl.kt deleted file mode 100644 index e117bec6..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistryImpl.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core.services - -/** - * Default implementation of the [ServiceRegistry] interface. - */ -internal class ServiceRegistryImpl(private val map: Map<ServiceKey<*>, Any>) : ServiceRegistry { - override val keys: Collection<ServiceKey<*>> - get() = map.keys - - override fun contains(key: ServiceKey<*>): Boolean = key in map - - override fun <T : Any> get(key: ServiceKey<T>): T { - @Suppress("UNCHECKED_CAST") - return map[key] as T - } - - override fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry = - ServiceRegistryImpl(map.plus(key to service)) - - override fun toString(): String = map.toString() -} diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/workload/Workload.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/workload/Workload.kt deleted file mode 100644 index f0bd1137..00000000 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/workload/Workload.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.core.workload - -import org.opendc.core.Identity -import org.opendc.core.User - -/** - * A high-level abstraction that represents the actual work that a set of compute resources perform, such - * as running an application on a machine or a whole workflow running multiple tasks on numerous machines. - */ -public interface Workload : Identity { - /** - * The owner of this workload. - */ - public val owner: User -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 636f291c..2d0da1bf 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -31,7 +31,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-harness")) implementation(project(":opendc-format")) implementation(project(":opendc-simulator:opendc-simulator-core")) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index a5cf4fc0..f327b55d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.experiments.capelin.experiment +package org.opendc.experiments.capelin import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -32,30 +32,24 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.driver.HostListener +import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AllocationPolicy -import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.SimHost -import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader -import org.opendc.metal.NODE_CLUSTER -import org.opendc.metal.NodeEvent -import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector -import org.opendc.simulator.failures.FailureDomain import org.opendc.simulator.failures.FaultInjector import org.opendc.trace.core.EventTracer import java.io.File @@ -72,20 +66,20 @@ private val logger = KotlinLogging.logger {} /** * Construct the failure domain for the experiments. */ -public suspend fun createFailureDomain( +public fun createFailureDomain( coroutineScope: CoroutineScope, clock: Clock, seed: Int, failureInterval: Double, - bareMetalProvisioner: ProvisioningService, + service: ComputeService, chan: Channel<Unit> ): CoroutineScope { val job = coroutineScope.launch { chan.receive() val random = Random(seed) val injectors = mutableMapOf<String, FaultInjector>() - for (node in bareMetalProvisioner.nodes()) { - val cluster = node.metadata[NODE_CLUSTER] as String + for (host in service.hosts) { + val cluster = host.meta["cluster"] as String val injector = injectors.getOrPut(cluster) { createFaultInjector( @@ -95,7 +89,7 @@ public suspend fun createFailureDomain( failureInterval ) } - injector.enqueue(node.metadata["driver"] as FailureDomain) + injector.enqueue(host as SimHost) } } return CoroutineScope(coroutineScope.coroutineContext + job) @@ -139,41 +133,39 @@ public fun createTraceReader( ) } -public data class ProvisionerResult( - val metal: ProvisioningService, - val provisioner: SimHostProvisioner, - val compute: ComputeServiceImpl -) - /** - * Construct the environment for a VM provisioner and return the provisioner instance. + * Construct the environment for a simulated compute service.. */ -public suspend fun createProvisioner( +public fun createComputeService( coroutineScope: CoroutineScope, clock: Clock, environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, eventTracer: EventTracer -): ProvisionerResult { - val environment = environmentReader.use { it.construct(coroutineScope, clock) } - val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider()) - val hosts = provisioner.provisionAll() +): ComputeServiceImpl { + val hosts = environmentReader + .use { it.read() } + .map { def -> + SimHost( + def.uid, + def.name, + def.model, + def.meta, + coroutineScope.coroutineContext, + clock, + SimFairShareHypervisorProvider(), + def.powerModel + ) + } - val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + val scheduler = + ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl for (host in hosts) { scheduler.addHost(host) } - // Wait for the hypervisors to be spawned - delay(10) - - return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler) + return scheduler } /** @@ -186,25 +178,16 @@ public fun attachMonitor( scheduler: ComputeService, monitor: ExperimentMonitor ) { - - val hypervisors = scheduler.hosts - - // Monitor hypervisor events - for (hypervisor in hypervisors) { - // TODO Do not expose Host directly but use Hypervisor class. - val server = (hypervisor as SimHost).node - monitor.reportHostStateChange(clock.millis(), hypervisor, server) - server.events - .onEach { event -> - val time = clock.millis() - when (event) { - is NodeEvent.StateChanged -> { - monitor.reportHostStateChange(time, hypervisor, event.node) - } - } + // Monitor host events + for (host in scheduler.hosts) { + monitor.reportHostStateChange(clock.millis(), host, HostState.UP) + host.addListener(object : HostListener { + override fun onStateChanged(host: Host, newState: HostState) { + monitor.reportHostStateChange(clock.millis(), host, newState) } - .launchIn(coroutineScope) - hypervisor.events + }) + + host.events .onEach { event -> when (event) { is HostEvent.SliceFinished -> monitor.reportHostSlice( @@ -216,15 +199,14 @@ public fun attachMonitor( event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - (event.driver as SimHost).node + event.driver ) } } .launchIn(coroutineScope) - val driver = server.metadata["driver"] as SimBareMetalDriver - driver.powerDraw - .onEach { monitor.reportPowerConsumption(server, it) } + (host as SimHost).powerDraw + .onEach { monitor.reportPowerConsumption(host, it) } .launchIn(coroutineScope) } @@ -244,29 +226,32 @@ public fun attachMonitor( public suspend fun processTrace( coroutineScope: CoroutineScope, clock: Clock, - reader: TraceReader<ComputeWorkload>, + reader: TraceReader<SimWorkload>, scheduler: ComputeService, chan: Channel<Unit>, monitor: ExperimentMonitor ) { val client = scheduler.newClient() + val image = client.newImage("vm-image") try { var submitted = 0 while (reader.hasNext()) { - val (time, workload) = reader.next() + val entry = reader.next() submitted++ - delay(max(0, time - clock.millis())) + delay(max(0, entry.start - clock.millis())) coroutineScope.launch { chan.send(Unit) val server = client.newServer( - workload.image.name, - workload.image, - Flavor( - workload.image.tags["cores"] as Int, - workload.image.tags["required-memory"] as Long - ) + entry.name, + image, + client.newFlavor( + entry.name, + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta ) server.watch(object : ServerWatcher { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index ff0a026d..f9c96bb6 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -35,10 +35,6 @@ import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolic import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* -import org.opendc.experiments.capelin.experiment.attachMonitor -import org.opendc.experiments.capelin.experiment.createFailureDomain -import org.opendc.experiments.capelin.experiment.createProvisioner -import org.opendc.experiments.capelin.experiment.processTrace import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -157,7 +153,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { ) testScope.launch { - val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( + val scheduler = createComputeService( this, clock, environment, @@ -172,7 +168,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { clock, seeder.nextInt(), operationalPhenomena.failureFrequency, - bareMetalProvisioner, + scheduler, chan ) } else { @@ -197,7 +193,6 @@ public abstract class Portfolio(name: String) : Experiment(name) { failureDomain?.cancel() scheduler.close() - provisioner.close() } try { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 1e42cf56..14cc06dc 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -26,7 +26,7 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host -import org.opendc.metal.Node +import org.opendc.compute.service.driver.HostState import java.io.Closeable /** @@ -41,17 +41,12 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked when the state of a host changes. */ - public fun reportHostStateChange( - time: Long, - driver: Host, - host: Node - ) { - } + public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {} /** * Report the power consumption of a host. */ - public fun reportPowerConsumption(host: Node, draw: Double) {} + public fun reportPowerConsumption(host: Host, draw: Double) {} /** * This method is invoked for a host for each slice that is finishes. @@ -65,7 +60,7 @@ public interface ExperimentMonitor : Closeable { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - host: Node, + host: Host, duration: Long = 5 * 60 * 1000L ) { } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index 98052214..c9d57a98 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -27,11 +27,11 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.telemetry.HostEvent import org.opendc.experiments.capelin.telemetry.ProvisionerEvent import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter -import org.opendc.metal.Node import java.io.File /** @@ -51,7 +51,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: File(base, "provisioner-metrics/$partition/data.parquet"), bufferSize ) - private val currentHostEvent = mutableMapOf<Node, HostEvent>() + private val currentHostEvent = mutableMapOf<Host, HostEvent>() private var startTime = -1L override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { @@ -63,12 +63,8 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } } - override fun reportHostStateChange( - time: Long, - driver: Host, - host: Node - ) { - logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } + override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { + logger.debug { "Host ${host.uid} changed state $newState [$time]" } val previousEvent = currentHostEvent[host] @@ -97,9 +93,9 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: ) } - private val lastPowerConsumption = mutableMapOf<Node, Double>() + private val lastPowerConsumption = mutableMapOf<Host, Double>() - override fun reportPowerConsumption(host: Node, draw: Double) { + override fun reportPowerConsumption(host: Host, draw: Double) { lastPowerConsumption[host] = draw } @@ -112,7 +108,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - host: Node, + host: Host, duration: Long ) { val previousEvent = currentHostEvent[host] @@ -130,7 +126,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event @@ -148,7 +144,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event @@ -168,7 +164,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt index e7b6a7bb..899fc9b1 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.telemetry -import org.opendc.metal.Node +import org.opendc.compute.service.driver.Host /** * A periodic report of the host machine metrics. @@ -30,7 +30,7 @@ import org.opendc.metal.Node public data class HostEvent( override val timestamp: Long, public val duration: Long, - public val node: Node, + public val host: Host, public val vmCount: Int, public val requestedBurst: Long, public val grantedBurst: Long, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt index b4fdd66a..4a3e7963 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt @@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : // record.put("portfolio_id", event.run.parent.parent.id) // record.put("scenario_id", event.run.parent.id) // record.put("run_id", event.run.id) - record.put("host_id", event.node.name) - record.put("state", event.node.state.name) + record.put("host_id", event.host.name) + record.put("state", event.host.state.name) record.put("timestamp", event.timestamp) record.put("duration", event.duration) record.put("vm_count", event.vmCount) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt index f9630078..a8462a51 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -22,14 +22,13 @@ package org.opendc.experiments.capelin.trace -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.workload.SimWorkload import java.util.TreeSet /** @@ -45,11 +44,11 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, workload: Workload, seed: Int -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The iterator over the actual trace. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> = + private val iterator: Iterator<TraceEntry<SimWorkload>> = rawReaders .map { it.read() } .run { @@ -67,19 +66,11 @@ public class Sc20ParquetTraceReader( this else { map { entry -> - val image = entry.workload.image - val id = image.name + val id = entry.name val relevantPerformanceInterferenceModelItems = performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) - val newImage = - Image( - image.uid, - image.name, - image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - ) - val newWorkload = entry.workload.copy(image = newImage) - Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) + entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems)) } } } @@ -87,7 +78,7 @@ public class Sc20ParquetTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt index b29bdc54..7ea5efe5 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -26,12 +26,10 @@ import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.File import java.util.UUID @@ -48,6 +46,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { * Read the fragments into memory. */ private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> { + @Suppress("DEPRECATION") val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet")) .disableCompatibility() .build() @@ -59,11 +58,9 @@ public class Sc20RawParquetTraceReader(private val path: File) { val record = reader.read() ?: break val id = record["id"].toString() - val tick = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long val fragment = SimTraceWorkload.Fragment( duration, @@ -83,13 +80,14 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntryImpl> { + private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { + @Suppress("DEPRECATION") val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet")) .disableCompatibility() .build() var counter = 0 - val entries = mutableListOf<TraceEntryImpl>() + val entries = mutableListOf<TraceEntry<SimWorkload>>() return try { while (true) { @@ -109,13 +107,9 @@ public class Sc20RawParquetTraceReader(private val path: File) { val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs val workload = SimTraceWorkload(vmFragments) - val vmWorkload = ComputeWorkload( - uid, - id, - UnnamedUser, - Image( - uid, - id, + entries.add( + TraceEntry( + uid, id, submissionTime, workload, mapOf( "submit-time" to submissionTime, "end-time" to endTime, @@ -126,7 +120,6 @@ public class Sc20RawParquetTraceReader(private val path: File) { ) ) ) - entries.add(TraceEntryImpl(submissionTime, vmWorkload)) } entries @@ -141,7 +134,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * The entries in the trace. */ - private val entries: List<TraceEntryImpl> + private val entries: List<TraceEntry<SimWorkload>> init { val fragments = parseFragments(path) @@ -151,21 +144,5 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the entries in the trace. */ - public fun read(): List<TraceEntry<ComputeWorkload>> = entries - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - internal data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> + public fun read(): List<TraceEntry<SimWorkload>> = entries } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index c588fda3..9ab69572 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -31,14 +31,12 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.File import java.io.Serializable import java.util.SortedSet @@ -62,11 +60,11 @@ public class Sc20StreamingParquetTraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * The intermediate buffer to store the read records in. @@ -98,6 +96,7 @@ public class Sc20StreamingParquetTraceReader( * The thread to read the records in. */ private val readerThread = thread(start = true, name = "sc20-reader") { + @Suppress("DEPRECATION") val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } @@ -113,11 +112,9 @@ public class Sc20StreamingParquetTraceReader( } val id = record["id"].toString() - val tick = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long val fragment = SimTraceWorkload.Fragment( duration, @@ -167,6 +164,7 @@ public class Sc20StreamingParquetTraceReader( val entries = mutableMapOf<String, GenericData.Record>() val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>() + @Suppress("DEPRECATION") val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } @@ -236,35 +234,25 @@ public class Sc20StreamingParquetTraceReader( Random(random.nextInt()) ) val workload = SimTraceWorkload(fragments) - val vmWorkload = ComputeWorkload( - uid, - "VM Workload $id", - UnnamedUser, - Image( - uid, - id, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - TraceEntryImpl( - submissionTime, - vmWorkload + TraceEntry( + uid, id, submissionTime, workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to maxCores, + "required-memory" to requiredMemory, + "workload" to workload + ) ) } - .sortedBy { it.submissionTime } + .sortedBy { it.start } .toList() .iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() { readerThread.interrupt() @@ -287,20 +275,4 @@ public class Sc20StreamingParquetTraceReader( return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() } } - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt index 881652f6..5c8727ea 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -23,12 +23,11 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry +import org.opendc.simulator.compute.workload.SimWorkload import java.util.* import kotlin.random.Random @@ -38,11 +37,11 @@ private val logger = KotlinLogging.logger {} * Sample the workload for the specified [run]. */ public fun sampleWorkload( - trace: List<TraceEntry<ComputeWorkload>>, + trace: List<TraceEntry<SimWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<ComputeWorkload>> { +): List<TraceEntry<SimWorkload>> { return when { workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) workload.samplingStrategy == SamplingStrategy.HPC -> @@ -58,24 +57,24 @@ public fun sampleWorkload( * Sample a regular (non-HPC) workload. */ public fun sampleRegularWorkload( - trace: List<TraceEntry<ComputeWorkload>>, + trace: List<TraceEntry<SimWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<ComputeWorkload>> { +): List<TraceEntry<SimWorkload>> { val fraction = subWorkload.fraction val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() val totalLoad = if (workload is CompositeWorkload) { workload.totalLoad } else { - shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + shuffled.sumByDouble { it.meta.getValue("total-load") as Double } } var currentLoad = 0.0 for (entry in shuffled) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + val entryLoad = entry.meta.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -93,23 +92,23 @@ public fun sampleRegularWorkload( * Sample a HPC workload. */ public fun sampleHpcWorkload( - trace: List<TraceEntry<ComputeWorkload>>, + trace: List<TraceEntry<SimWorkload>>, workload: Workload, seed: Int, sampleOnLoad: Boolean -): List<TraceEntry<ComputeWorkload>> { +): List<TraceEntry<SimWorkload>> { val pattern = Regex("^vm__workload__(ComputeNode|cn).*") val random = Random(seed) val fraction = workload.fraction val (hpc, nonHpc) = trace.partition { entry -> - val name = entry.workload.image.name + val name = entry.name name.matches(pattern) } val hpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() hpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -118,7 +117,7 @@ public fun sampleHpcWorkload( val nonHpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() nonHpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -130,7 +129,7 @@ public fun sampleHpcWorkload( val totalLoad = if (workload is CompositeWorkload) { workload.totalLoad } else { - trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + trace.sumByDouble { it.meta.getValue("total-load") as Double } } logger.debug { "Total trace load: $totalLoad" } @@ -139,12 +138,12 @@ public fun sampleHpcWorkload( var nonHpcCount = 0 var nonHpcLoad = 0.0 - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() if (sampleOnLoad) { var currentLoad = 0.0 for (entry in hpcSequence) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + val entryLoad = entry.meta.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -156,7 +155,7 @@ public fun sampleHpcWorkload( } for (entry in nonHpcSequence) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + val entryLoad = entry.meta.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > 1) { break } @@ -170,7 +169,7 @@ public fun sampleHpcWorkload( hpcSequence .take((fraction * trace.size).toInt()) .forEach { entry -> - hpcLoad += entry.workload.image.tags.getValue("total-load") as Double + hpcLoad += entry.meta.getValue("total-load") as Double hpcCount += 1 res.add(entry) } @@ -178,7 +177,7 @@ public fun sampleHpcWorkload( nonHpcSequence .take(((1 - fraction) * trace.size).toInt()) .forEach { entry -> - nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double + nonHpcLoad += entry.meta.getValue("total-load") as Double nonHpcCount += 1 res.add(entry) } @@ -194,16 +193,7 @@ public fun sampleHpcWorkload( /** * Sample a random trace entry. */ -private fun sample(entry: TraceEntry<ComputeWorkload>, i: Int): TraceEntry<ComputeWorkload> { - val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = Image( - id, - entry.workload.image.name, - entry.workload.image.tags - ) - val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) - return VmTraceEntry(vmWorkload, entry.submissionTime) +private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> { + val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) + return entry.copy(uid = uid) } - -private class VmTraceEntry(override val workload: ComputeWorkload, override val submissionTime: Long) : - TraceEntry<ComputeWorkload> diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index dfc6b90b..4e6cfddc 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,13 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy -import org.opendc.experiments.capelin.experiment.attachMonitor -import org.opendc.experiments.capelin.experiment.createFailureDomain -import org.opendc.experiments.capelin.experiment.createProvisioner -import org.opendc.experiments.capelin.experiment.processTrace import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader @@ -46,7 +42,7 @@ import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader -import org.opendc.metal.Node +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer import java.io.File @@ -101,15 +97,13 @@ class CapelinIntegrationTest { val tracer = EventTracer(clock) testScope.launch { - val res = createProvisioner( + scheduler = createComputeService( this, clock, environmentReader, allocationPolicy, tracer ) - val bareMetalProvisioner = res.metal - scheduler = res.compute val failureDomain = if (failures) { println("ENABLING failures") @@ -118,7 +112,7 @@ class CapelinIntegrationTest { clock, seed, 24.0 * 7, - bareMetalProvisioner, + scheduler, chan ) } else { @@ -140,7 +134,6 @@ class CapelinIntegrationTest { failureDomain?.cancel() scheduler.close() monitor.close() - res.provisioner.close() } runSimulation() @@ -166,7 +159,7 @@ class CapelinIntegrationTest { val tracer = EventTracer(clock) testScope.launch { - val (_, provisioner, scheduler) = createProvisioner( + val scheduler = createComputeService( this, clock, environmentReader, @@ -187,7 +180,6 @@ class CapelinIntegrationTest { scheduler.close() monitor.close() - provisioner.close() } runSimulation() @@ -209,7 +201,7 @@ class CapelinIntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<ComputeWorkload> { + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { return Sc20ParquetTraceReader( listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), emptyMap(), @@ -241,7 +233,7 @@ class CapelinIntegrationTest { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - host: Node, + host: Host, duration: Long ) { totalRequestedBurst += requestedBurst diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index 00aa0395..02e77c7c 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -30,10 +30,9 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-harness")) implementation(project(":opendc-format")) - implementation(project(":opendc-workflows")) + implementation(project(":opendc-workflow:opendc-workflow-service")) implementation(project(":opendc-simulator:opendc-simulator-core")) implementation(project(":opendc-compute:opendc-compute-simulator")) } diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 7b9d70ed..9e305b3d 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -26,23 +26,22 @@ import kotlinx.coroutines.* import kotlinx.coroutines.test.TestCoroutineScope import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.simulator.SimHostProvisioner +import org.opendc.compute.simulator.SimHost import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf -import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer import org.opendc.trace.core.enable -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.WorkflowEvent -import org.opendc.workflows.service.WorkflowSchedulerMode -import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy -import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy -import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy +import org.opendc.workflow.service.WorkflowEvent +import org.opendc.workflow.service.WorkflowService +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy import java.io.File import java.io.FileInputStream import kotlin.math.max @@ -84,16 +83,20 @@ public class UnderspecificationExperiment : Experiment("underspecification") { } testScope.launch { - val environment = Sc18EnvironmentReader(FileInputStream(File(environment))) - .use { it.construct(testScope, clock) } + val hosts = Sc18EnvironmentReader(FileInputStream(File(environment))) + .use { it.read() } + .map { def -> + SimHost( + def.uid, + def.name, + def.model, + def.meta, + testScope.coroutineContext, + clock, + SimSpaceSharedHypervisorProvider() + ) + } - val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService] - - // Wait for the bare metal nodes to be spawned - delay(10) - - val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) - val hosts = provisioner.provisionAll() val compute = ComputeService( testScope.coroutineContext, clock, @@ -103,11 +106,8 @@ public class UnderspecificationExperiment : Experiment("underspecification") { hosts.forEach { compute.addHost(it) } - // Wait for the hypervisors to be spawned - delay(10) - - val scheduler = StageWorkflowService( - testScope, + val scheduler = WorkflowService( + testScope.coroutineContext, clock, tracer, compute.newClient(), @@ -121,9 +121,9 @@ public class UnderspecificationExperiment : Experiment("underspecification") { val reader = GwfTraceReader(File(trace)) while (reader.hasNext()) { - val (time, job) = reader.next() - delay(max(0, time * 1000 - clock.millis())) - scheduler.submit(job) + val entry = reader.next() + delay(max(0, entry.start * 1000 - clock.millis())) + scheduler.submit(entry.workload) } } diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt index dbd04b87..a8356888 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.sc18 import org.opendc.trace.core.EventStream import org.opendc.trace.core.onEvent -import org.opendc.workflows.service.WorkflowEvent +import org.opendc.workflow.service.WorkflowEvent import java.util.* import kotlin.coroutines.resume import kotlin.coroutines.suspendCoroutine diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts index 37e9c9c8..385e556d 100644 --- a/simulator/opendc-format/build.gradle.kts +++ b/simulator/opendc-format/build.gradle.kts @@ -30,9 +30,8 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-workflows")) + api(project(":opendc-workflow:opendc-workflow-api")) implementation(project(":opendc-simulator:opendc-simulator-compute")) implementation(project(":opendc-compute:opendc-compute-simulator")) api("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt index 1f73bb61..97d6f239 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt @@ -22,17 +22,14 @@ package org.opendc.format.environment -import kotlinx.coroutines.CoroutineScope -import org.opendc.core.Environment import java.io.Closeable -import java.time.Clock /** - * An interface for reading descriptions of topology environments into memory as [Environment]. + * An interface for reading descriptions of topology environments into memory. */ public interface EnvironmentReader : Closeable { /** - * Construct an [Environment] in the specified [CoroutineScope]. + * Read the environment into a list. */ - public suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment + public fun read(): List<MachineDef> } diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Identity.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt index 252c40f5..b5b3b84b 100644 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Identity.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,21 +20,16 @@ * SOFTWARE. */ -package org.opendc.core +package org.opendc.format.environment +import org.opendc.compute.simulator.power.api.CpuPowerModel +import org.opendc.simulator.compute.SimMachineModel import java.util.* -/** - * An object that has a unique identity. - */ -public interface Identity { - /** - * A unique, opaque, system-generated value, representing the object. - */ - public val uid: UUID - - /** - * A non-empty, human-readable string representing the object. - */ - public val name: String -} +public data class MachineDef( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: SimMachineModel, + val powerModel: CpuPowerModel +) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index bbbbe87c..3da8d0b3 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -25,21 +25,14 @@ package org.opendc.format.environment.sc18 import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.simulator.SimBareMetalDriver -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry +import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream -import java.time.Clock import java.util.* /** @@ -55,9 +48,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + /** + * Read the environment. + */ + public override fun read(): List<MachineDef> { var counter = 0 - val nodes = setup.rooms.flatMap { room -> + return setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> when (roomObject) { is RoomObject.Rack -> { @@ -75,35 +71,18 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimBareMetalDriver( - coroutineScope, - clock, - UUID.randomUUID(), - "node-${counter++}", + MachineDef( + UUID(0L, counter++.toLong()), + "node-$counter", emptyMap(), - SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))) + SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), + ConstantPowerModel(0.0) ) } } } } } - - val provisioningService = SimpleProvisioningService() - for (node in nodes) { - provisioningService.create(node) - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - val platform = Platform( - UUID.randomUUID(), - "sc18-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(setup.name, null, listOf(platform)) } override fun close() {} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 998f9cd6..9a06a40f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -22,17 +22,9 @@ package org.opendc.format.environment.sc20 -import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.NODE_CLUSTER -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -40,7 +32,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit import java.io.File import java.io.FileInputStream import java.io.InputStream -import java.time.Clock import java.util.* /** @@ -54,8 +45,7 @@ public class Sc20ClusterEnvironmentReader( public constructor(file: File) : this(FileInputStream(file)) - @Suppress("BlockingMethodInNonBlockingContext") - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + public override fun read(): List<MachineDef> { var clusterIdCol = 0 var speedCol = 0 var numberOfHostsCol = 0 @@ -69,7 +59,7 @@ public class Sc20ClusterEnvironmentReader( var memoryPerHost: Long var coresPerHost: Int - val nodes = mutableListOf<SimBareMetalDriver>() + val nodes = mutableListOf<MachineDef>() val random = Random(0) input.bufferedReader().use { reader -> @@ -103,12 +93,10 @@ public class Sc20ClusterEnvironmentReader( repeat(numberOfHosts) { nodes.add( - SimBareMetalDriver( - coroutineScope, - clock, + MachineDef( UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", - mapOf(NODE_CLUSTER to clusterId), + mapOf("cluster" to clusterId), SimMachineModel( List(coresPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, speed) @@ -125,22 +113,7 @@ public class Sc20ClusterEnvironmentReader( } } - val provisioningService = SimpleProvisioningService() - for (node in nodes) { - provisioningService.create(node) - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "sc20-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment("SC20 Environment", null, listOf(platform)) + return nodes } override fun close() {} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 6cf65f7f..effd0286 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -25,22 +25,14 @@ package org.opendc.format.environment.sc20 import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream -import java.time.Clock import java.util.* /** @@ -55,9 +47,12 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + /** + * Read the environment. + */ + public override fun read(): List<MachineDef> { var counter = 0 - val nodes = setup.rooms.flatMap { room -> + return setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> when (roomObject) { is RoomObject.Rack -> { @@ -81,11 +76,9 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimBareMetalDriver( - coroutineScope, - clock, - UUID.randomUUID(), - "node-${counter++}", + MachineDef( + UUID(0L, counter++.toLong()), + "node-$counter", emptyMap(), SimMachineModel(cores, memories), // For now we assume a simple linear load model with an idle draw of ~200W and a maximum @@ -98,23 +91,6 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja } } } - - val provisioningService = SimpleProvisioningService() - for (node in nodes) { - provisioningService.create(node) - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "sc20-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(setup.name, null, listOf(platform)) } override fun close() {} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt index ec547e84..3ce79d69 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt @@ -24,31 +24,21 @@ package org.opendc.format.trace -import org.opendc.core.workload.Workload +import java.util.UUID /** * An entry in a workload trace. * - * @param T The shape of the workload in this entry. + * @param uid The unique identifier of the entry. + * @param name The name of the entry. + * @param start The start time of the workload. + * @param workload The workload of the entry. + * @param meta The meta-data associated with the workload. */ -public interface TraceEntry<T : Workload> { - /** - * The time of submission of the workload. - */ - public val submissionTime: Long - - /** - * The workload in this trace entry. - */ - public val workload: T - - /** - * Extract the submission time from this entry. - */ - public operator fun component1(): Long = submissionTime - - /** - * Extract the workload from this entry. - */ - public operator fun component2(): T = workload -} +public data class TraceEntry<out T>( + val uid: UUID, + val name: String, + val start: Long, + val workload: T, + val meta: Map<String, Any> +) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt index a0beec3e..7df1acd3 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt @@ -22,14 +22,13 @@ package org.opendc.format.trace -import org.opendc.core.workload.Workload import java.io.Closeable /** - * An interface for reading [Workload]s into memory. + * An interface for reading workloads into memory. * * This interface must guarantee that the entries are delivered in order of submission time. * * @param T The shape of the workloads supported by this reader. */ -public interface TraceReader<T : Workload> : Iterator<TraceEntry<T>>, Closeable +public interface TraceReader<T> : Iterator<TraceEntry<T>>, Closeable diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt deleted file mode 100644 index 54fb6214..00000000 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import org.opendc.core.workload.Workload -import java.io.Closeable - -/** - * An interface for persisting workload traces (e.g. to disk). - * - * @param T The type of [Workload] supported by this writer. - */ -public interface TraceWriter<T : Workload> : Closeable { - /** - * Write an entry to the trace. - * - * Entries must be written in order of submission time. Failing to do so results in a [IllegalArgumentException]. - * - * @param submissionTime The time of submission of the workload. - * @param workload The workload to write to the trace. - */ - public fun write(submissionTime: Long, workload: T) -} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 1571b17d..769b2b13 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -22,14 +22,12 @@ package org.opendc.format.trace.bitbrains -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -45,17 +43,17 @@ import kotlin.math.min public class BitbrainsTraceReader( traceDirectory: File, performanceInterferenceModel: PerformanceInterferenceModel -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>() + val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() var timestampCol = 0 var coreCol = 0 @@ -132,50 +130,27 @@ public class BitbrainsTraceReader( ) val workload = SimTraceWorkload(flopsHistory.asSequence()) - val vmWorkload = ComputeWorkload( + entries[vmId] = TraceEntry( uuid, - "VM Workload $vmId", - UnnamedUser, - Image( - uuid, - vmId.toString(), - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to cores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - entries[vmId] = TraceEntryImpl( + vmId.toString(), startTime, - vmWorkload + workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to cores, + "required-memory" to requiredMemory, + "workload" to workload + ) ) } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = entries.values.sortedBy { it.start }.iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index cd7aff3c..e68afeb7 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -22,15 +22,13 @@ package org.opendc.format.trace.gwf -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import java.io.BufferedReader import java.io.File import java.io.InputStream @@ -88,7 +86,8 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntryImpl>() + val workflows = mutableMapOf<Long, Job>() + val starts = mutableMapOf<Long, Long>() val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() @@ -131,22 +130,21 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { val flops: Long = 4000 * runtime * cores - val entry = entries.getOrPut(workflowId) { - TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) } - val workflow = entry.workload val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, taskId), "<unnamed>", - Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to workload)), HashSet(), mapOf( + "workload" to workload, WORKFLOW_TASK_CORES to cores, WORKFLOW_TASK_DEADLINE to (runtime * 1000) ), ) - entry.submissionTime = min(entry.submissionTime, submitTime) + starts.merge(workflowId, submitTime, ::min) (workflow.tasks as MutableSet<Task>).add(task) tasks[taskId] = task taskDependencies[task] = dependencies @@ -165,7 +163,9 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } + .sortedBy { it.start } + .iterator() } override fun hasNext(): Boolean = iterator.hasNext() @@ -173,20 +173,4 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { override fun next(): TraceEntry<Job> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: Job - ) : TraceEntry<Job> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt index 07785632..1eb4bac2 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -22,14 +22,12 @@ package org.opendc.format.trace.sc20 -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -49,17 +47,17 @@ public class Sc20TraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<UUID, TraceEntry<ComputeWorkload>>() + val entries = mutableMapOf<UUID, TraceEntry<SimWorkload>>() val timestampCol = 0 val cpuUsageCol = 1 @@ -85,7 +83,7 @@ public class Sc20TraceReader( var vmId = "" var maxCores = -1 var requiredMemory = -1L - var timestamp = -1L + var timestamp: Long var cores = -1 var minTime = Long.MAX_VALUE @@ -157,50 +155,27 @@ public class Sc20TraceReader( Random(random.nextInt()) ) val workload = SimTraceWorkload(flopsFragments.asSequence()) - val vmWorkload = ComputeWorkload( + entries[uuid] = TraceEntry( uuid, - "VM Workload $vmId", - UnnamedUser, - Image( - uuid, - vmId, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to cores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - entries[uuid] = TraceEntryImpl( + vmId, minTime, - vmWorkload + workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to cores, + "required-memory" to requiredMemory, + "workload" to workload + ) ) } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = entries.values.sortedBy { it.start }.iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt index ead20c35..0d1f3cea 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt @@ -22,12 +22,10 @@ package org.opendc.format.trace.swf -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -43,17 +41,17 @@ import java.util.* public class SwfTraceReader( file: File, maxNumCores: Int = -1 -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>() + val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() val jobNumberCol = 0 val submitTimeCol = 1 // seconds (begin of trace is 0) @@ -73,7 +71,6 @@ public class SwfTraceReader( var slicedWaitTime: Long var flopsPerSecond: Long var flopsPartialSlice: Long - var flopsFullSlice: Long var runtimePartialSliceRemainder: Long BufferedReader(FileReader(file)).use { reader -> @@ -127,7 +124,6 @@ public class SwfTraceReader( flopsPerSecond = 4_000L * cores runtimePartialSliceRemainder = runTime % sliceDuration flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder - flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice for ( tick in (submitTime + slicedWaitTime) @@ -155,48 +151,27 @@ public class SwfTraceReader( val uuid = UUID(0L, jobNumber) val workload = SimTraceWorkload(flopsHistory.asSequence()) - val vmWorkload = ComputeWorkload( + entries[jobNumber] = TraceEntry( uuid, - "SWF Workload $jobNumber", - UnnamedUser, - Image( - uuid, - jobNumber.toString(), - mapOf( - "cores" to cores, - "required-memory" to memory, - "workload" to workload - ) + jobNumber.toString(), + submitTime, + workload, + mapOf( + "cores" to cores, + "required-memory" to memory, + "workload" to workload ) ) - - entries[jobNumber] = TraceEntryImpl(submitTime, vmWorkload) } } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = entries.values.sortedBy { it.start }.iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index 5a271fab..feadf61f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,15 +25,13 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import java.util.UUID import kotlin.math.min @@ -53,10 +51,12 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntryImpl>() + val workflows = mutableMapOf<Long, Job>() + val starts = mutableMapOf<Long, Long>() val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() + @Suppress("DEPRECATION") val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build() while (true) { @@ -74,29 +74,22 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { val flops: Long = 4100 * (runtime / 1000) * cores - val entry = entries.getOrPut(workflowId) { - TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) } - val workflow = entry.workload val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, taskId), "<unnamed>", - Image( - UUID.randomUUID(), - "<unnamed>", - mapOf( - "workload" to workload - ) - ), HashSet(), mapOf( + "workload" to workload, WORKFLOW_TASK_CORES to cores, WORKFLOW_TASK_DEADLINE to runtime ) ) - entry.submissionTime = min(entry.submissionTime, submitTime) + starts.merge(workflowId, submitTime, ::min) (workflow.tasks as MutableSet<Task>).add(task) tasks[taskId] = task taskDependencies[task] = dependencies @@ -112,7 +105,9 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } + .sortedBy { it.start } + .iterator() } override fun hasNext(): Boolean = iterator.hasNext() @@ -120,20 +115,4 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { override fun next(): TraceEntry<Job> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: Job - ) : TraceEntry<Job> } diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt index 7e3d2623..e0e049cf 100644 --- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -32,14 +32,14 @@ class SwfTraceReaderTest { internal fun testParseSwf() { val reader = SwfTraceReader(File(SwfTraceReaderTest::class.java.getResource("/swf_trace.txt").toURI())) var entry = reader.next() - assertEquals(0, entry.submissionTime) + assertEquals(0, entry.start) // 1961 slices for waiting, 3 full and 1 partial running slices - assertEquals(1965, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size) + assertEquals(1965, (entry.workload as SimTraceWorkload).trace.toList().size) entry = reader.next() - assertEquals(164472, entry.submissionTime) + assertEquals(164472, entry.start) // 1188 slices for waiting, 0 full and 1 partial running slices - assertEquals(1189, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size) - assertEquals(0.25, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().last().usage) + assertEquals(1189, (entry.workload as SimTraceWorkload).trace.toList().size) + assertEquals(0.25, (entry.workload as SimTraceWorkload).trace.toList().last().usage) } } diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt index 58d96657..bcfa7553 100644 --- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt @@ -36,11 +36,11 @@ class WtfTraceReaderTest { fun testParseWtf() { val reader = WtfTraceReader("src/test/resources/wtf-trace") var entry = reader.next() - assertEquals(0, entry.submissionTime) + assertEquals(0, entry.start) assertEquals(23, entry.workload.tasks.size) entry = reader.next() - assertEquals(333387, entry.submissionTime) + assertEquals(333387, entry.start) assertEquals(23, entry.workload.tasks.size) } } diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt deleted file mode 100644 index f1d4ea2e..00000000 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.metal - -/** - * An enumeration describing the possible states of a bare-metal compute node. - */ -public enum class NodeState { - /** - * The node is booting. - */ - BOOT, - - /** - * The node is powered off. - */ - SHUTOFF, - - /** - * The node is active and running. - */ - ACTIVE, - - /** - * The node is in error. - */ - ERROR, - - /** - * The state of the node is unknown. - */ - UNKNOWN, -} diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt deleted file mode 100644 index 3b15be94..00000000 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.metal.driver - -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.api.Image -import org.opendc.compute.api.Server -import org.opendc.core.services.AbstractServiceKey -import org.opendc.metal.Node -import java.util.UUID - -/** - * A driver interface for the management interface of a bare-metal compute node. - */ -public interface BareMetalDriver { - /** - * The [Node] that is controlled by this driver. - */ - public val node: Flow<Node> - - /** - * The amount of work done by the machine in percentage with respect to the total amount of processing power - * available. - */ - public val usage: Flow<Double> - - /** - * Initialize the driver. - */ - public suspend fun init(): Node - - /** - * Start the bare metal node with the specified boot disk image. - */ - public suspend fun start(): Node - - /** - * Stop the bare metal node if it is running. - */ - public suspend fun stop(): Node - - /** - * Reboot the bare metal node. - */ - public suspend fun reboot(): Node - - /** - * Update the boot disk image of the compute node. - * - * Changing the boot disk image of node does not affect it while the node is running. In order to start the new boot - * disk image, the compute node must be restarted. - */ - public suspend fun setImage(image: Image): Node - - /** - * Obtain the state of the compute node. - */ - public suspend fun refresh(): Node - - /** - * A key that allows access to the [BareMetalDriver] instance from a [Server] that runs on the bare-metal machine. - */ - public companion object Key : AbstractServiceKey<BareMetalDriver>(UUID.randomUUID(), "bare-metal:driver") -} diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt deleted file mode 100644 index 2d6353c8..00000000 --- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.metal.service - -import kotlinx.coroutines.CancellationException -import org.opendc.compute.api.Image -import org.opendc.metal.Node -import org.opendc.metal.driver.BareMetalDriver - -/** - * A very basic implementation of the [ProvisioningService]. - */ -public class SimpleProvisioningService : ProvisioningService { - /** - * The active nodes in this service. - */ - private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf() - - override suspend fun create(driver: BareMetalDriver): Node { - val node = driver.init() - nodes[node] = driver - return node - } - - override suspend fun nodes(): Set<Node> = nodes.keys - - override suspend fun refresh(node: Node): Node { - return nodes[node]!!.refresh() - } - - override suspend fun deploy(node: Node, image: Image): Node { - val driver = nodes[node]!! - driver.setImage(image) - return driver.reboot() - } - - override suspend fun stop(node: Node): Node { - val driver = nodes[node]!! - return try { - driver.stop() - } catch (e: CancellationException) { - node - } - } -} diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts index d0b80cc7..d07fe7a6 100644 --- a/simulator/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc-runner-web/build.gradle.kts @@ -34,7 +34,6 @@ application { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) implementation(project(":opendc-compute:opendc-compute-simulator")) implementation(project(":opendc-format")) implementation(project(":opendc-experiments:opendc-experiments-capelin")) diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 482fe754..b9aeecb8 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -46,11 +46,11 @@ import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolic import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* -import org.opendc.experiments.capelin.experiment.attachMonitor -import org.opendc.experiments.capelin.experiment.createFailureDomain -import org.opendc.experiments.capelin.experiment.createProvisioner -import org.opendc.experiments.capelin.experiment.processTrace +import org.opendc.experiments.capelin.attachMonitor +import org.opendc.experiments.capelin.createComputeService +import org.opendc.experiments.capelin.createFailureDomain import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.capelin.processTrace import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader @@ -247,7 +247,7 @@ public class RunnerCli : CliktCommand(name = "runner") { val tracer = EventTracer(clock) testScope.launch { - val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( + val scheduler = createComputeService( this, clock, environment, @@ -262,7 +262,7 @@ public class RunnerCli : CliktCommand(name = "runner") { clock, seeder.nextInt(), operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, - bareMetalProvisioner, + scheduler, chan ) } else { @@ -287,7 +287,6 @@ public class RunnerCli : CliktCommand(name = "runner") { failureDomain?.cancel() scheduler.close() - provisioner.close() } try { diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index 2f11347d..e7e99a3d 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -28,36 +28,24 @@ import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Field import com.mongodb.client.model.Filters import com.mongodb.client.model.Projections -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch import org.bson.Document import org.bson.types.ObjectId -import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.NODE_CLUSTER -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import java.time.Clock import java.util.* /** * A helper class that converts the MongoDB topology into an OpenDC environment. */ public class TopologyParser(private val collection: MongoCollection<Document>, private val id: ObjectId) : EnvironmentReader { - /** - * Parse the topology with the specified [id]. - */ - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { - val nodes = mutableListOf<SimBareMetalDriver>() + + public override fun read(): List<MachineDef> { + val nodes = mutableListOf<MachineDef>() val random = Random(0) for (machine in fetchMachines(id)) { @@ -85,36 +73,17 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p val energyConsumptionW = machine.getList("cpus", Document::class.java).sumBy { it.getInteger("energyConsumptionW") }.toDouble() nodes.add( - SimBareMetalDriver( - coroutineScope, - clock, + MachineDef( UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$position", - mapOf(NODE_CLUSTER to clusterId), + mapOf("cluster" to clusterId), SimMachineModel(processors, memoryUnits), LinearPowerModel(2 * energyConsumptionW, .5) ) ) } - val provisioningService = SimpleProvisioningService() - coroutineScope.launch { - for (node in nodes) { - provisioningService.create(node) - } - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "opendc-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(fetchName(id), null, listOf(platform)) + return nodes } override fun close() {} diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index fe814c76..a8ac6c10 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -27,10 +27,9 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.telemetry.HostEvent -import org.opendc.metal.Node -import org.opendc.metal.NodeState import kotlin.math.max /** @@ -38,7 +37,7 @@ import kotlin.math.max */ public class WebExperimentMonitor : ExperimentMonitor { private val logger = KotlinLogging.logger {} - private val currentHostEvent = mutableMapOf<Node, HostEvent>() + private val currentHostEvent = mutableMapOf<Host, HostEvent>() private var startTime = -1L override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { @@ -50,12 +49,8 @@ public class WebExperimentMonitor : ExperimentMonitor { } } - override fun reportHostStateChange( - time: Long, - driver: Host, - host: Node - ) { - logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } + override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { + logger.debug { "Host ${host.uid} changed state $newState [$time]" } val previousEvent = currentHostEvent[host] @@ -84,9 +79,9 @@ public class WebExperimentMonitor : ExperimentMonitor { ) } - private val lastPowerConsumption = mutableMapOf<Node, Double>() + private val lastPowerConsumption = mutableMapOf<Host, Double>() - override fun reportPowerConsumption(host: Node, draw: Double) { + override fun reportPowerConsumption(host: Host, draw: Double) { lastPowerConsumption[host] = draw } @@ -99,7 +94,7 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - host: Node, + host: Host, duration: Long ) { val previousEvent = currentHostEvent[host] @@ -117,7 +112,7 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event @@ -135,7 +130,7 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event @@ -155,7 +150,7 @@ public class WebExperimentMonitor : ExperimentMonitor { cpuUsage, cpuDemand, lastPowerConsumption[host] ?: 200.0, - host.flavor.cpuCount + host.model.cpuCount ) currentHostEvent[host] = event @@ -164,7 +159,7 @@ public class WebExperimentMonitor : ExperimentMonitor { } private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap<Node, HostMetrics> = mutableMapOf() + private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf() private fun processHostEvent(event: HostEvent) { val slices = event.duration / SLICE_LENGTH @@ -175,14 +170,14 @@ public class WebExperimentMonitor : ExperimentMonitor { hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)), - hostAggregateMetrics.totalFailureSlices + if (event.node.state != NodeState.ACTIVE) slices.toLong() else 0, - hostAggregateMetrics.totalFailureVmSlices + if (event.node.state != NodeState.ACTIVE) event.vmCount * slices.toLong() else 0 + hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0, + hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0 ) - hostMetrics.compute(event.node) { _, prev -> + hostMetrics.compute(event.host) { _, prev -> HostMetrics( - (event.cpuUsage.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), - (event.cpuDemand.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0), event.vmCount + (prev?.vmCount ?: 0), 1 + (prev?.count ?: 0) ) diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/StateFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/StateFlow.kt deleted file mode 100644 index 996e7700..00000000 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/StateFlow.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.utils.flow - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.ConflatedBroadcastChannel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.flow.asFlow - -/** - * A [Flow] that contains a single value that changes over time. - * - * This class exists to implement the DataFlow/StateFlow functionality that will be implemented in `kotlinx-coroutines` - * in the future, but is not available yet. - * See: https://github.com/Kotlin/kotlinx.coroutines/pull/1354 - */ -public interface StateFlow<T> : Flow<T> { - /** - * The current value of this flow. - * - * Setting a value that is [equal][Any.equals] to the previous one does nothing. - */ - public var value: T -} - -/** - * Creates a [StateFlow] with a given initial [value]. - */ -@Suppress("FunctionName") -public fun <T> StateFlow(value: T): StateFlow<T> = StateFlowImpl(value) - -/** - * Internal implementation of the [StateFlow] interface. - */ -@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) -private class StateFlowImpl<T>(initialValue: T) : StateFlow<T> { - /** - * The [BroadcastChannel] to back this flow. - */ - private val chan = ConflatedBroadcastChannel(initialValue) - - /** - * The internal [Flow] backing this flow. - */ - private val flow = chan.asFlow() - - public override var value: T = initialValue - set(value) { - chan.offer(value) - field = value - } - - @InternalCoroutinesApi - override suspend fun collect(collector: FlowCollector<T>) = flow.collect(collector) -} diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/User.kt b/simulator/opendc-workflow/build.gradle.kts index fc542cef..3cefa409 100644 --- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/User.kt +++ b/simulator/opendc-workflow/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,14 +20,4 @@ * SOFTWARE. */ -package org.opendc.core - -/** - * A user of the cloud network. - */ -public interface User : Identity { - /** - * The name of the user. - */ - override val name: String -} +description = "Workflow orchestration for OpenDC" diff --git a/simulator/opendc-metal/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-api/build.gradle.kts index 9207de18..d3e67bee 100644 --- a/simulator/opendc-metal/build.gradle.kts +++ b/simulator/opendc-workflow/opendc-workflow-api/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -description = "Bare-metal provisioning in OpenDC" +description = "Workflow orchestration service API for OpenDC" /* Build configuration */ plugins { @@ -29,10 +29,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) - implementation("io.github.microutils:kotlin-logging") } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt index f1cfdf65..5e8b0b9e 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt +++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,8 @@ * SOFTWARE. */ -package org.opendc.workflows.workload +package org.opendc.workflow.api -import org.opendc.core.User -import org.opendc.core.workload.Workload import java.util.* /** @@ -31,17 +29,15 @@ import java.util.* * * @property uid A unique identified of this workflow. * @property name The name of this workflow. - * @property owner The owner of the workflow. * @property tasks The tasks that are part of this workflow. * @property metadata Additional metadata for the job. */ public data class Job( - override val uid: UUID, - override val name: String, - override val owner: User, - public val tasks: Set<Task>, - public val metadata: Map<String, Any> = emptyMap() -) : Workload { + val uid: UUID, + val name: String, + val tasks: Set<Task>, + val metadata: Map<String, Any> = emptyMap() +) { override fun equals(other: Any?): Boolean = other is Job && uid == other.uid override fun hashCode(): Int = uid.hashCode() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt index 4305aa57..db208998 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt +++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package org.opendc.workflows.workload +package org.opendc.workflow.api /** * Meta-data key for the deadline of a task. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt index 4c6d2842..d91f9879 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt +++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,10 +20,8 @@ * SOFTWARE. */ -package org.opendc.workflows.workload +package org.opendc.workflow.api -import org.opendc.compute.api.Image -import org.opendc.core.Identity import java.util.* /** @@ -38,12 +34,11 @@ import java.util.* * @property metadata Additional metadata for this task. */ public data class Task( - override val uid: UUID, - override val name: String, - public val image: Image, - public val dependencies: Set<Task>, - public val metadata: Map<String, Any> = emptyMap() -) : Identity { + val uid: UUID, + val name: String, + val dependencies: Set<Task>, + val metadata: Map<String, Any> = emptyMap() +) { override fun equals(other: Any?): Boolean = other is Task && uid == other.uid override fun hashCode(): Int = uid.hashCode() diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts index b6a2fc45..12a54235 100644 --- a/simulator/opendc-workflows/build.gradle.kts +++ b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -description = "Workflow service for OpenDC" +description = "Workflow orchestration service for OpenDC" /* Build configuration */ plugins { @@ -30,7 +30,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) + api(project(":opendc-workflow:opendc-workflow-api")) api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt index bcf93562..bb2ad6c6 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service import org.opendc.trace.core.Event -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task /** * An event emitted by the [WorkflowService]. diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt new file mode 100644 index 00000000..2f83e376 --- /dev/null +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import kotlinx.coroutines.flow.Flow +import org.opendc.compute.api.ComputeClient +import org.opendc.trace.core.EventTracer +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * A service for cloud workflow management. + * + * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al. + */ +public interface WorkflowService : AutoCloseable { + /** + * The events emitted by the workflow scheduler. + */ + public val events: Flow<WorkflowEvent> + + /** + * Submit the specified [Job] to the workflow service for scheduling. + */ + public suspend fun submit(job: Job) + + /** + * Terminate the lifecycle of the workflow service, stopping all running workflows. + */ + public override fun close() + + public companion object { + /** + * Construct a new [WorkflowService] implementation. + * + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param tracer The event tracer to use. + * @param compute The compute client to use. + * @param mode The scheduling mode to use. + * @param jobAdmissionPolicy The job admission policy to use. + * @param jobOrderPolicy The job order policy to use. + * @param taskEligibilityPolicy The task eligibility policy to use. + * @param taskOrderPolicy The task order policy to use. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + tracer: EventTracer, + compute: ComputeClient, + mode: WorkflowSchedulerMode, + jobAdmissionPolicy: JobAdmissionPolicy, + jobOrderPolicy: JobOrderPolicy, + taskEligibilityPolicy: TaskEligibilityPolicy, + taskOrderPolicy: TaskOrderPolicy + ): WorkflowService { + return WorkflowServiceImpl( + context, + clock, + tracer, + compute, + mode, + jobAdmissionPolicy, + jobOrderPolicy, + taskEligibilityPolicy, + taskOrderPolicy + ) + } + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt index 89849f6a..1bb67169 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal -import org.opendc.workflows.workload.Job +import org.opendc.workflow.api.Job public class JobState(public val job: Job, public val submittedAt: Long) { /** diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt index ef9714c2..c3ce1492 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal import org.opendc.compute.api.Server -import org.opendc.workflows.workload.Task +import org.opendc.workflow.api.Task public class TaskState(public val job: JobState, public val task: Task) { /** @@ -39,12 +39,12 @@ public class TaskState(public val job: JobState, public val task: Task) { /** * The dependencies of this task. */ - public val dependencies: HashSet<TaskState> = HashSet<TaskState>() + public val dependencies: HashSet<TaskState> = HashSet() /** * The dependents of this task. */ - public val dependents: HashSet<TaskState> = HashSet<TaskState>() + public val dependents: HashSet<TaskState> = HashSet() /** * A flag to indicate whether this workflow task instance is a workflow root. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt index 99f5bb87..fe941d09 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal /** * The state of a workflow task. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt index 18721889..29c6aeea 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal -public interface StageWorkflowSchedulerListener { - public fun cycleStarted(scheduler: StageWorkflowService) {} - public fun cycleFinished(scheduler: StageWorkflowService) {} +public interface WorkflowSchedulerListener { + public fun cycleStarted(scheduler: WorkflowServiceImpl) {} + public fun cycleFinished(scheduler: WorkflowServiceImpl) {} public fun jobSubmitted(job: JobState) {} public fun jobStarted(job: JobState) {} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 6b348ed4..85a88acd 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch @@ -32,21 +33,24 @@ import org.opendc.compute.api.* import org.opendc.trace.core.EventTracer import org.opendc.trace.core.consumeAsFlow import org.opendc.trace.core.enable -import org.opendc.workflows.service.stage.job.JobAdmissionPolicy -import org.opendc.workflows.service.stage.job.JobOrderPolicy -import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy -import org.opendc.workflows.service.stage.task.TaskOrderPolicy -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.service.* +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock import java.util.* +import kotlin.coroutines.CoroutineContext /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for * Datacenter Scheduling. */ -public class StageWorkflowService( - internal val coroutineScope: CoroutineScope, +public class WorkflowServiceImpl( + context: CoroutineContext, internal val clock: Clock, internal val tracer: EventTracer, private val computeClient: ComputeClient, @@ -57,6 +61,11 @@ public class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy ) : WorkflowService, ServerWatcher { /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + internal val scope = CoroutineScope(context) + + /** * The logger instance to use. */ private val logger = KotlinLogging.logger {} @@ -99,17 +108,17 @@ public class StageWorkflowService( /** * The root listener of this scheduler. */ - private val rootListener = object : StageWorkflowSchedulerListener { + private val rootListener = object : WorkflowSchedulerListener { /** * The listeners to delegate to. */ - val listeners = mutableSetOf<StageWorkflowSchedulerListener>() + val listeners = mutableSetOf<WorkflowSchedulerListener>() - override fun cycleStarted(scheduler: StageWorkflowService) { + override fun cycleStarted(scheduler: WorkflowServiceImpl) { listeners.forEach { it.cycleStarted(scheduler) } } - override fun cycleFinished(scheduler: StageWorkflowService) { + override fun cycleFinished(scheduler: WorkflowServiceImpl) { listeners.forEach { it.cycleFinished(scheduler) } } @@ -145,6 +154,7 @@ public class StageWorkflowService( private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic + private lateinit var image: Image init { this.mode = mode(this) @@ -152,6 +162,9 @@ public class StageWorkflowService( this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) + scope.launch { + image = computeClient.newImage("workflow-runner") + } } override val events: Flow<WorkflowEvent> = tracer.openRecording().let { @@ -190,6 +203,10 @@ public class StageWorkflowService( requestCycle() } + override fun close() { + scope.cancel() + } + /** * Indicate to the scheduler that a scheduling cycle is needed. */ @@ -214,7 +231,12 @@ public class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - tracer.commit(WorkflowEvent.JobStarted(this, jobInstance.job)) + tracer.commit( + WorkflowEvent.JobStarted( + this, + jobInstance.job + ) + ) rootListener.jobStarted(jobInstance) } @@ -258,16 +280,27 @@ public class StageWorkflowService( val instance = taskQueue.peek() val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1 - val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task - val image = instance.task.image - coroutineScope.launch { - val server = computeClient.newServer(instance.task.name, image, flavor) + val image = image + scope.launch { + val flavor = computeClient.newFlavor( + instance.task.name, + cores, + 1000 + ) // TODO How to determine memory usage for workflow task + val server = computeClient.newServer( + instance.task.name, + image, + flavor, + start = false, + meta = instance.task.metadata + ) instance.state = TaskStatus.ACTIVE instance.server = server taskByServer[server] = instance - server.watch(this@StageWorkflowService) + server.watch(this@WorkflowServiceImpl) + server.start() } activeTasks += instance @@ -278,20 +311,28 @@ public class StageWorkflowService( public override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { - ServerState.ACTIVE -> { + ServerState.PROVISIONING -> { + } + ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() tracer.commit( WorkflowEvent.TaskStarted( - this@StageWorkflowService, + this@WorkflowServiceImpl, task.job.job, task.task ) ) rootListener.taskStarted(task) } - ServerState.SHUTOFF, ServerState.ERROR -> { + ServerState.TERMINATED, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() + + scope.launch { + server.delete() + server.flavor.delete() + } + val job = task.job task.state = TaskStatus.FINISHED task.finishedAt = clock.millis() @@ -299,7 +340,7 @@ public class StageWorkflowService( activeTasks -= task tracer.commit( WorkflowEvent.TaskFinished( - this@StageWorkflowService, + this@WorkflowServiceImpl, task.job.job, task.task ) @@ -322,6 +363,8 @@ public class StageWorkflowService( requestCycle() } + ServerState.DELETED -> { + } else -> throw IllegalStateException() } } @@ -332,11 +375,11 @@ public class StageWorkflowService( rootListener.jobFinished(job) } - public fun addListener(listener: StageWorkflowSchedulerListener) { + public fun addListener(listener: WorkflowSchedulerListener) { rootListener.listeners += listener } - public fun removeListener(listener: StageWorkflowSchedulerListener) { + public fun removeListener(listener: WorkflowSchedulerListener) { rootListener.listeners -= listener } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt index d76579f9..359fc223 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage +package org.opendc.workflow.service.scheduler -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.io.Serializable /** @@ -32,5 +32,5 @@ public interface StagePolicy<T : Any> : Serializable { /** * Build the logic of the stage policy. */ - public operator fun invoke(scheduler: StageWorkflowService): T + public operator fun invoke(scheduler: WorkflowServiceImpl): T } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt index cf8f92e0..58e7893f 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.scheduler import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * The operating mode of a workflow scheduler. @@ -44,9 +44,9 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. */ public object Interactive : WorkflowSchedulerMode() { - override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { override fun requestCycle() { - scheduler.coroutineScope.launch { scheduler.schedule() } + scheduler.scope.launch { scheduler.schedule() } } } @@ -59,14 +59,14 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo public data class Batch(val quantum: Long) : WorkflowSchedulerMode() { private var next: kotlinx.coroutines.Job? = null - override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { override fun requestCycle() { if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. val delay = quantum - (scheduler.clock.millis() % quantum) - val job = scheduler.coroutineScope.launch { + val job = scheduler.scope.launch { delay(delay) next = null scheduler.schedule() @@ -85,12 +85,12 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo public data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() { private var next: kotlinx.coroutines.Job? = null - override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { override fun requestCycle() { if (next == null) { val delay = random.nextInt(200).toLong() - val job = scheduler.coroutineScope.launch { + val job = scheduler.scope.launch { delay(delay) next = null scheduler.schedule() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt index 1190a408..1b5b91b9 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,21 +20,23 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobOrderPolicy] that orders jobs based on its critical path length. */ public data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = - object : Comparator<JobState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = + object : + Comparator<JobState>, + WorkflowSchedulerListener { private val results = HashMap<Job, Long>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt index 0e5a42c0..ed3acff7 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.scheduler.StagePolicy /** * A policy interface for admitting [JobState]s to a scheduling cycle. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt index 83d42b2d..adaa6671 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.scheduler.StagePolicy /** * A policy interface for ordering admitted workflows in the scheduling queue. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt index 6f6ccb50..6a0bfeb9 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobAdmissionPolicy] that limits the amount of active jobs in the system. @@ -31,7 +31,7 @@ import org.opendc.workflows.service.StageWorkflowService * @property limit The maximum number of concurrent jobs in the system. */ public data class LimitJobAdmissionPolicy(public val limit: Int) : JobAdmissionPolicy { - override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { override fun invoke( job: JobState ): JobAdmissionPolicy.Advice = diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt index ac74f090..31f8f8db 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobAdmissionPolicy] that admits all jobs. */ public object NullJobAdmissionPolicy : JobAdmissionPolicy { - override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt index 6c747261..1b359125 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,23 +20,23 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.workload.Job +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.util.* import kotlin.collections.HashMap -import kotlin.collections.getValue -import kotlin.collections.set /** * A [JobOrderPolicy] that randomly orders jobs. */ public object RandomJobOrderPolicy : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = - object : Comparator<JobState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = + object : + Comparator<JobState>, + WorkflowSchedulerListener { private val random = Random(123) private val ids = HashMap<Job, Int>() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt index c1c244c3..6998606d 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [SizeJobOrderPolicy] that orders jobs based on the number of tasks it has. */ public data class SizeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = compareBy { it.tasks.size.let { if (ascending) it else -it } } override fun toString(): String { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt index 005f8153..53d06023 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobOrderPolicy] orders jobs in FIFO order. */ public data class SubmissionTimeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = compareBy { it.submittedAt.let { if (ascending) it else -it } } override fun toString(): String { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt index 6a465746..821d4964 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of active relative tasks (w.r.t. its job) in the system. */ public data class ActiveTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val active = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt index f3f19ef5..42804f5a 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import kotlin.math.max /** @@ -36,8 +36,8 @@ import kotlin.math.max * the average. */ public data class BalancingTaskEligibilityPolicy(public val tolerance: Double = 1.5) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = - object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener { private val active = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt index 0020023f..dae7ad99 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of completed relative tasks. */ public data class CompletionTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val finished = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt index a9f5eb84..7786f6ec 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has. */ public data class DependenciesTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { it.task.dependencies.size.let { if (ascending) it else -it } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt index e5a9f159..4fb835d7 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has. */ public data class DependentsTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { it.dependents.size.let { if (ascending) it else -it } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt index 7ce8ccce..3a634de7 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the average duration of the preceding tasks in the job. */ public data class DurationHistoryTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val results = HashMap<JobState, MutableList<Long>>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt index 3674eb01..d9fde53a 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,15 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.util.* import kotlin.collections.HashMap import kotlin.collections.getValue -import kotlin.collections.minusAssign import kotlin.collections.set /** @@ -37,8 +36,8 @@ import kotlin.collections.set */ public data class DurationTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val results = HashMap<UUID, Long>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt index 2dddbc7c..229460df 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskEligibilityPolicy] that limits the number of active tasks of a job in the system. */ public data class LimitPerJobTaskEligibilityPolicy(public val limit: Int) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = - object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener { private val active = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt index fdc1fd5e..57aa0d58 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskEligibilityPolicy] that limits the total number of active tasks in the system. */ public data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { override fun invoke( task: TaskState ): TaskEligibilityPolicy.Advice = diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt index b40f9823..cfe2aeed 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskEligibilityPolicy] that always allows new tasks to enter. */ public object NullTaskEligibilityPolicy : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = Logic private object Logic : TaskEligibilityPolicy.Logic { override fun invoke( diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt index a0691b23..a01439c2 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,17 +20,17 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.util.* /** * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability]. */ public data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { val random = Random(123) override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt index 890e7165..c12d6a66 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,20 +20,20 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.workload.Task +import org.opendc.workflow.api.Task +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import kotlin.random.Random /** * A [TaskOrderPolicy] that orders the tasks randomly. */ public object RandomTaskOrderPolicy : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val random = Random(123) private val ids = HashMap<Task, Int>() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt index 6b0199b8..e9bbf815 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue. */ public data class SubmissionTimeTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { it.job.submittedAt.let { if (ascending) it else -it } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt index 37597709..ee31aee2 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.scheduler.StagePolicy /** * A policy interface for determining the eligibility of tasks in a scheduling cycle. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt index 5feac6d0..fffcb765 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.scheduler.StagePolicy /** * This interface represents the **T2** stage of the Reference Architecture for Topology Schedulers and provides the diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt index 4207cdfd..2161f5f2 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,10 +20,9 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach @@ -38,23 +35,24 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy -import org.opendc.compute.simulator.SimHostProvisioner +import org.opendc.compute.simulator.SimHost import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer -import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy -import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy -import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy import kotlin.math.max /** - * Integration test suite for the [StageWorkflowService]. + * Integration test suite for the [WorkflowServiceImpl]. */ -@DisplayName("StageWorkflowService") +@DisplayName("WorkflowServiceImpl") @OptIn(ExperimentalCoroutinesApi::class) internal class StageWorkflowSchedulerIntegrationTest { /** @@ -72,26 +70,27 @@ internal class StageWorkflowSchedulerIntegrationTest { val clock = DelayControllerClockAdapter(testScope) val tracer = EventTracer(clock) - val schedulerAsync = testScope.async { - val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.construct(testScope, clock) } - - val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService] - - // Wait for the bare metal nodes to be spawned - delay(10) + val scheduler = let { + val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) + .use { it.read() } + .map { def -> + SimHost( + def.uid, + def.name, + def.model, + def.meta, + testScope.coroutineContext, + clock, + SimSpaceSharedHypervisorProvider() + ) + } - val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) - val hosts = provisioner.provisionAll() val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) hosts.forEach { compute.addHost(it) } - // Wait for the hypervisors to be spawned - delay(10) - - StageWorkflowService( - testScope, + WorkflowService( + testScope.coroutineContext, clock, tracer, compute.newClient(), @@ -104,7 +103,6 @@ internal class StageWorkflowSchedulerIntegrationTest { } testScope.launch { - val scheduler = schedulerAsync.await() scheduler.events .onEach { event -> when (event) { @@ -119,13 +117,12 @@ internal class StageWorkflowSchedulerIntegrationTest { testScope.launch { val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) - val scheduler = schedulerAsync.await() while (reader.hasNext()) { - val (time, job) = reader.next() + val entry = reader.next() jobsSubmitted++ - delay(max(0, time - clock.millis())) - scheduler.submit(job) + delay(max(0, entry.start - clock.millis())) + scheduler.submit(entry.workload) } } diff --git a/simulator/opendc-workflows/src/test/resources/environment.json b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/environment.json index 0965b250..0965b250 100644 --- a/simulator/opendc-workflows/src/test/resources/environment.json +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/environment.json diff --git a/simulator/opendc-workflows/src/test/resources/log4j2.xml b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml index 70a0eacc..70a0eacc 100644 --- a/simulator/opendc-workflows/src/test/resources/log4j2.xml +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml diff --git a/simulator/opendc-workflows/src/test/resources/trace.gwf b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/trace.gwf index d264b9c3..d264b9c3 100644 --- a/simulator/opendc-workflows/src/test/resources/trace.gwf +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/trace.gwf diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index 7a82adcd..e87dd4d8 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -22,12 +22,11 @@ rootProject.name = "opendc-simulator" include(":opendc-platform") -include(":opendc-core") include(":opendc-compute:opendc-compute-api") include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-simulator") -include(":opendc-metal") -include(":opendc-workflows") +include(":opendc-workflow:opendc-workflow-api") +include(":opendc-workflow:opendc-workflow-service") include(":opendc-format") include(":opendc-experiments:opendc-experiments-sc18") include(":opendc-experiments:opendc-experiments-capelin") |
