diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-09 14:01:55 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-03-09 14:01:55 +0100 |
| commit | 66c2501d95b167f9e7474a45e542f82d2d8e83ff (patch) | |
| tree | 7c3464a424891ab7c3cb9c0ac77d67256b144f97 /simulator/opendc-workflows | |
| parent | 2977dd8a5f1d742193eae79364a284e68269f7b5 (diff) | |
| parent | 75751865179c6cd5a05abb4a0641193595f59b45 (diff) | |
compute: Improvements to cloud compute model (v1)
This is the first of the pull requests in an attempt to improve the existing cloud compute model (see #86). This pull request restructures the compute API and splits the consumer and service interfaces into different modules:
- opendc-compute-api now defines the API interface for the OpenDC Compute module, which can be used by consumers of the OpenDC Compute service.
- opendc-compute-service hosts the service implementation for OpenDC Compute and contains all business logic regarding the IaaS platform (such as scheduling).
- opendc-compute-simulator implements a "compute driver" for the OpenDC Compute platform that simulates submitted workloads.
- Image is now a data-class and does not specify itself the workload to simulate. Instead, the workload should be passed via its tags currently (with key "workload"). In the future, the simulation backend will accept a mapper interface that maps Images to Workloads.
Diffstat (limited to 'simulator/opendc-workflows')
9 files changed, 64 insertions, 127 deletions
diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts index b4ffac7d..b6a2fc45 100644 --- a/simulator/opendc-workflows/build.gradle.kts +++ b/simulator/opendc-workflows/build.gradle.kts @@ -29,15 +29,16 @@ plugins { } dependencies { + api(platform(project(":opendc-platform"))) api(project(":opendc-core")) - api(project(":opendc-compute:opendc-compute-core")) + api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) - implementation("io.github.microutils:kotlin-logging:${versions.kotlinLogging}") + implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) testImplementation(project(":opendc-compute:opendc-compute-simulator")) testImplementation(project(":opendc-format")) testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") - testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl") } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index e04c8a4c..6b348ed4 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -25,16 +25,10 @@ package org.opendc.workflows.service import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.ServerState -import org.opendc.compute.core.virt.service.VirtProvisioningService +import org.opendc.compute.api.* import org.opendc.trace.core.EventTracer import org.opendc.trace.core.consumeAsFlow import org.opendc.trace.core.enable @@ -55,13 +49,13 @@ public class StageWorkflowService( internal val coroutineScope: CoroutineScope, internal val clock: Clock, internal val tracer: EventTracer, - private val provisioningService: VirtProvisioningService, + private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, taskOrderPolicy: TaskOrderPolicy -) : WorkflowService { +) : WorkflowService, ServerWatcher { /** * The logger instance to use. */ @@ -103,12 +97,6 @@ public class StageWorkflowService( internal val taskByServer = mutableMapOf<Server, TaskState>() /** - * The load of the system. - */ - internal val load: Double - get() = (activeTasks.size / provisioningService.hostCount.toDouble()) - - /** * The root listener of this scheduler. */ private val rootListener = object : StageWorkflowSchedulerListener { @@ -205,7 +193,7 @@ public class StageWorkflowService( /** * Indicate to the scheduler that a scheduling cycle is needed. */ - private suspend fun requestCycle() = mode.requestCycle() + private fun requestCycle() = mode.requestCycle() /** * Perform a scheduling cycle immediately. @@ -273,15 +261,13 @@ public class StageWorkflowService( val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task val image = instance.task.image coroutineScope.launch { - val server = provisioningService.deploy(instance.task.name, image, flavor) + val server = computeClient.newServer(instance.task.name, image, flavor) instance.state = TaskStatus.ACTIVE instance.server = server taskByServer[server] = instance - server.events - .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } - .launchIn(coroutineScope) + server.watch(this@StageWorkflowService) } activeTasks += instance @@ -290,8 +276,8 @@ public class StageWorkflowService( } } - private suspend fun stateChanged(server: Server) { - when (server.state) { + public override fun onStateChanged(server: Server, newState: ServerState) { + when (newState) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt index d1eb6704..ef9714c2 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt @@ -22,7 +22,7 @@ package org.opendc.workflows.service -import org.opendc.compute.core.Server +import org.opendc.compute.api.Server import org.opendc.workflows.workload.Task public class TaskState(public val job: JobState, public val task: Task) { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt index d03adc61..cf8f92e0 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,7 +24,6 @@ package org.opendc.workflows.service import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import kotlinx.coroutines.yield import org.opendc.workflows.service.stage.StagePolicy /** @@ -38,7 +37,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo /** * Request a new scheduling cycle to be performed. */ - public suspend fun requestCycle() + public fun requestCycle() } /** @@ -46,9 +45,8 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo */ public object Interactive : WorkflowSchedulerMode() { override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { - yield() - scheduler.schedule() + override fun requestCycle() { + scheduler.coroutineScope.launch { scheduler.schedule() } } } @@ -62,7 +60,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo private var next: kotlinx.coroutines.Job? = null override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { + override fun requestCycle() { if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. @@ -88,7 +86,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo private var next: kotlinx.coroutines.Job? = null override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { - override suspend fun requestCycle() { + override fun requestCycle() { if (next == null) { val delay = random.nextInt(200).toLong() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt deleted file mode 100644 index 4f0c269a..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.job - -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService - -/** - * A [JobAdmissionPolicy] that limits the amount of jobs based on the average system load. - * - * @property limit The maximum load before stopping admission. - */ -public data class LoadJobAdmissionPolicy(public val limit: Double) : JobAdmissionPolicy { - override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { - override fun invoke( - job: JobState - ): JobAdmissionPolicy.Advice = - if (scheduler.load < limit) - JobAdmissionPolicy.Advice.ADMIT - else - JobAdmissionPolicy.Advice.STOP - } - - override fun toString(): String = "Limit-Load($limit)" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt deleted file mode 100644 index a80a8c63..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.task - -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState - -/** - * A [TaskEligibilityPolicy] that limits the number of active tasks in the system based on the average system load. - */ -public data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { - override fun invoke( - task: TaskState - ): TaskEligibilityPolicy.Advice = - if (scheduler.load < limit) - TaskEligibilityPolicy.Advice.ADMIT - else - TaskEligibilityPolicy.Advice.STOP - } - - override fun toString(): String = "Limit-Load($limit)" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt index 1834a4c8..4c6d2842 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt @@ -24,7 +24,7 @@ package org.opendc.workflows.workload -import org.opendc.compute.core.image.Image +import org.opendc.compute.api.Image import org.opendc.core.Identity import java.util.* diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 2bfcba35..4207cdfd 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -36,11 +36,12 @@ import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader +import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer @@ -80,7 +81,11 @@ internal class StageWorkflowSchedulerIntegrationTest { // Wait for the bare metal nodes to be spawned delay(10) - val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000) + val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) + val hosts = provisioner.provisionAll() + val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000) + + hosts.forEach { compute.addHost(it) } // Wait for the hypervisors to be spawned delay(10) @@ -89,7 +94,7 @@ internal class StageWorkflowSchedulerIntegrationTest { testScope, clock, tracer, - provisioner, + compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), diff --git a/simulator/opendc-workflows/src/test/resources/log4j2.xml b/simulator/opendc-workflows/src/test/resources/log4j2.xml new file mode 100644 index 00000000..70a0eacc --- /dev/null +++ b/simulator/opendc-workflows/src/test/resources/log4j2.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Copyright (c) 2021 AtLarge Research + ~ + ~ Permission is hereby granted, free of charge, to any person obtaining a copy + ~ of this software and associated documentation files (the "Software"), to deal + ~ in the Software without restriction, including without limitation the rights + ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + ~ copies of the Software, and to permit persons to whom the Software is + ~ furnished to do so, subject to the following conditions: + ~ + ~ The above copyright notice and this permission notice shall be included in all + ~ copies or substantial portions of the Software. + ~ + ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + ~ SOFTWARE. + --> + +<Configuration status="WARN" packages="org.apache.logging.log4j.core"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/> + </Console> + </Appenders> + <Loggers> + <Root level="warn"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> |
