summaryrefslogtreecommitdiff
path: root/simulator/opendc-workflows
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-09 14:01:55 +0100
committerGitHub <noreply@github.com>2021-03-09 14:01:55 +0100
commit66c2501d95b167f9e7474a45e542f82d2d8e83ff (patch)
tree7c3464a424891ab7c3cb9c0ac77d67256b144f97 /simulator/opendc-workflows
parent2977dd8a5f1d742193eae79364a284e68269f7b5 (diff)
parent75751865179c6cd5a05abb4a0641193595f59b45 (diff)
compute: Improvements to cloud compute model (v1)
This is the first of the pull requests in an attempt to improve the existing cloud compute model (see #86). This pull request restructures the compute API and splits the consumer and service interfaces into different modules: - opendc-compute-api now defines the API interface for the OpenDC Compute module, which can be used by consumers of the OpenDC Compute service. - opendc-compute-service hosts the service implementation for OpenDC Compute and contains all business logic regarding the IaaS platform (such as scheduling). - opendc-compute-simulator implements a "compute driver" for the OpenDC Compute platform that simulates submitted workloads. - Image is now a data-class and does not specify itself the workload to simulate. Instead, the workload should be passed via its tags currently (with key "workload"). In the future, the simulation backend will accept a mapper interface that maps Images to Workloads.
Diffstat (limited to 'simulator/opendc-workflows')
-rw-r--r--simulator/opendc-workflows/build.gradle.kts7
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt30
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt2
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt12
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt45
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt43
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt2
-rw-r--r--simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt15
-rw-r--r--simulator/opendc-workflows/src/test/resources/log4j2.xml35
9 files changed, 64 insertions, 127 deletions
diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts
index b4ffac7d..b6a2fc45 100644
--- a/simulator/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc-workflows/build.gradle.kts
@@ -29,15 +29,16 @@ plugins {
}
dependencies {
+ api(platform(project(":opendc-platform")))
api(project(":opendc-core"))
- api(project(":opendc-compute:opendc-compute-core"))
+ api(project(":opendc-compute:opendc-compute-api"))
api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
- implementation("io.github.microutils:kotlin-logging:${versions.kotlinLogging}")
+ implementation("io.github.microutils:kotlin-logging")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
testImplementation(project(":opendc-compute:opendc-compute-simulator"))
testImplementation(project(":opendc-format"))
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}")
- testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}")
+ testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl")
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index e04c8a4c..6b348ed4 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -25,16 +25,10 @@ package org.opendc.workflows.service
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
-import org.opendc.compute.core.virt.service.VirtProvisioningService
+import org.opendc.compute.api.*
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.consumeAsFlow
import org.opendc.trace.core.enable
@@ -55,13 +49,13 @@ public class StageWorkflowService(
internal val coroutineScope: CoroutineScope,
internal val clock: Clock,
internal val tracer: EventTracer,
- private val provisioningService: VirtProvisioningService,
+ private val computeClient: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
taskOrderPolicy: TaskOrderPolicy
-) : WorkflowService {
+) : WorkflowService, ServerWatcher {
/**
* The logger instance to use.
*/
@@ -103,12 +97,6 @@ public class StageWorkflowService(
internal val taskByServer = mutableMapOf<Server, TaskState>()
/**
- * The load of the system.
- */
- internal val load: Double
- get() = (activeTasks.size / provisioningService.hostCount.toDouble())
-
- /**
* The root listener of this scheduler.
*/
private val rootListener = object : StageWorkflowSchedulerListener {
@@ -205,7 +193,7 @@ public class StageWorkflowService(
/**
* Indicate to the scheduler that a scheduling cycle is needed.
*/
- private suspend fun requestCycle() = mode.requestCycle()
+ private fun requestCycle() = mode.requestCycle()
/**
* Perform a scheduling cycle immediately.
@@ -273,15 +261,13 @@ public class StageWorkflowService(
val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task
val image = instance.task.image
coroutineScope.launch {
- val server = provisioningService.deploy(instance.task.name, image, flavor)
+ val server = computeClient.newServer(instance.task.name, image, flavor)
instance.state = TaskStatus.ACTIVE
instance.server = server
taskByServer[server] = instance
- server.events
- .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
- .launchIn(coroutineScope)
+ server.watch(this@StageWorkflowService)
}
activeTasks += instance
@@ -290,8 +276,8 @@ public class StageWorkflowService(
}
}
- private suspend fun stateChanged(server: Server) {
- when (server.state) {
+ public override fun onStateChanged(server: Server, newState: ServerState) {
+ when (newState) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
index d1eb6704..ef9714c2 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
@@ -22,7 +22,7 @@
package org.opendc.workflows.service
-import org.opendc.compute.core.Server
+import org.opendc.compute.api.Server
import org.opendc.workflows.workload.Task
public class TaskState(public val job: JobState, public val task: Task) {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
index d03adc61..cf8f92e0 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,6 @@ package org.opendc.workflows.service
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
import org.opendc.workflows.service.stage.StagePolicy
/**
@@ -38,7 +37,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
/**
* Request a new scheduling cycle to be performed.
*/
- public suspend fun requestCycle()
+ public fun requestCycle()
}
/**
@@ -46,9 +45,8 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
*/
public object Interactive : WorkflowSchedulerMode() {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
- yield()
- scheduler.schedule()
+ override fun requestCycle() {
+ scheduler.coroutineScope.launch { scheduler.schedule() }
}
}
@@ -62,7 +60,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
private var next: kotlinx.coroutines.Job? = null
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
+ override fun requestCycle() {
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
@@ -88,7 +86,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
private var next: kotlinx.coroutines.Job? = null
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
+ override fun requestCycle() {
if (next == null) {
val delay = random.nextInt(200).toLong()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt
deleted file mode 100644
index 4f0c269a..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.job
-
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowService
-
-/**
- * A [JobAdmissionPolicy] that limits the amount of jobs based on the average system load.
- *
- * @property limit The maximum load before stopping admission.
- */
-public data class LoadJobAdmissionPolicy(public val limit: Double) : JobAdmissionPolicy {
- override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
- override fun invoke(
- job: JobState
- ): JobAdmissionPolicy.Advice =
- if (scheduler.load < limit)
- JobAdmissionPolicy.Advice.ADMIT
- else
- JobAdmissionPolicy.Advice.STOP
- }
-
- override fun toString(): String = "Limit-Load($limit)"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
deleted file mode 100644
index a80a8c63..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.task
-
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
-
-/**
- * A [TaskEligibilityPolicy] that limits the number of active tasks in the system based on the average system load.
- */
-public data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy {
- override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
- override fun invoke(
- task: TaskState
- ): TaskEligibilityPolicy.Advice =
- if (scheduler.load < limit)
- TaskEligibilityPolicy.Advice.ADMIT
- else
- TaskEligibilityPolicy.Advice.STOP
- }
-
- override fun toString(): String = "Limit-Load($limit)"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
index 1834a4c8..4c6d2842 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
@@ -24,7 +24,7 @@
package org.opendc.workflows.workload
-import org.opendc.compute.core.image.Image
+import org.opendc.compute.api.Image
import org.opendc.core.Identity
import java.util.*
diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 2bfcba35..4207cdfd 100644
--- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -36,11 +36,12 @@ import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.simulator.SimHostProvisioner
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
+import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
@@ -80,7 +81,11 @@ internal class StageWorkflowSchedulerIntegrationTest {
// Wait for the bare metal nodes to be spawned
delay(10)
- val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000)
+ val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider())
+ val hosts = provisioner.provisionAll()
+ val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000)
+
+ hosts.forEach { compute.addHost(it) }
// Wait for the hypervisors to be spawned
delay(10)
@@ -89,7 +94,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
testScope,
clock,
tracer,
- provisioner,
+ compute.newClient(),
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
diff --git a/simulator/opendc-workflows/src/test/resources/log4j2.xml b/simulator/opendc-workflows/src/test/resources/log4j2.xml
new file mode 100644
index 00000000..70a0eacc
--- /dev/null
+++ b/simulator/opendc-workflows/src/test/resources/log4j2.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2021 AtLarge Research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN" packages="org.apache.logging.log4j.core">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="warn">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>