summaryrefslogtreecommitdiff
path: root/simulator/opendc-workflows
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-workflows')
-rw-r--r--simulator/opendc-workflows/build.gradle.kts7
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt30
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt2
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt12
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt45
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt43
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt2
-rw-r--r--simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt15
-rw-r--r--simulator/opendc-workflows/src/test/resources/log4j2.xml35
9 files changed, 64 insertions, 127 deletions
diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts
index b4ffac7d..b6a2fc45 100644
--- a/simulator/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc-workflows/build.gradle.kts
@@ -29,15 +29,16 @@ plugins {
}
dependencies {
+ api(platform(project(":opendc-platform")))
api(project(":opendc-core"))
- api(project(":opendc-compute:opendc-compute-core"))
+ api(project(":opendc-compute:opendc-compute-api"))
api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
- implementation("io.github.microutils:kotlin-logging:${versions.kotlinLogging}")
+ implementation("io.github.microutils:kotlin-logging")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
testImplementation(project(":opendc-compute:opendc-compute-simulator"))
testImplementation(project(":opendc-format"))
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}")
- testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}")
+ testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl")
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index e04c8a4c..6b348ed4 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -25,16 +25,10 @@ package org.opendc.workflows.service
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
-import org.opendc.compute.core.virt.service.VirtProvisioningService
+import org.opendc.compute.api.*
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.consumeAsFlow
import org.opendc.trace.core.enable
@@ -55,13 +49,13 @@ public class StageWorkflowService(
internal val coroutineScope: CoroutineScope,
internal val clock: Clock,
internal val tracer: EventTracer,
- private val provisioningService: VirtProvisioningService,
+ private val computeClient: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
taskOrderPolicy: TaskOrderPolicy
-) : WorkflowService {
+) : WorkflowService, ServerWatcher {
/**
* The logger instance to use.
*/
@@ -103,12 +97,6 @@ public class StageWorkflowService(
internal val taskByServer = mutableMapOf<Server, TaskState>()
/**
- * The load of the system.
- */
- internal val load: Double
- get() = (activeTasks.size / provisioningService.hostCount.toDouble())
-
- /**
* The root listener of this scheduler.
*/
private val rootListener = object : StageWorkflowSchedulerListener {
@@ -205,7 +193,7 @@ public class StageWorkflowService(
/**
* Indicate to the scheduler that a scheduling cycle is needed.
*/
- private suspend fun requestCycle() = mode.requestCycle()
+ private fun requestCycle() = mode.requestCycle()
/**
* Perform a scheduling cycle immediately.
@@ -273,15 +261,13 @@ public class StageWorkflowService(
val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task
val image = instance.task.image
coroutineScope.launch {
- val server = provisioningService.deploy(instance.task.name, image, flavor)
+ val server = computeClient.newServer(instance.task.name, image, flavor)
instance.state = TaskStatus.ACTIVE
instance.server = server
taskByServer[server] = instance
- server.events
- .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
- .launchIn(coroutineScope)
+ server.watch(this@StageWorkflowService)
}
activeTasks += instance
@@ -290,8 +276,8 @@ public class StageWorkflowService(
}
}
- private suspend fun stateChanged(server: Server) {
- when (server.state) {
+ public override fun onStateChanged(server: Server, newState: ServerState) {
+ when (newState) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
index d1eb6704..ef9714c2 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
@@ -22,7 +22,7 @@
package org.opendc.workflows.service
-import org.opendc.compute.core.Server
+import org.opendc.compute.api.Server
import org.opendc.workflows.workload.Task
public class TaskState(public val job: JobState, public val task: Task) {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
index d03adc61..cf8f92e0 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,6 @@ package org.opendc.workflows.service
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
import org.opendc.workflows.service.stage.StagePolicy
/**
@@ -38,7 +37,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
/**
* Request a new scheduling cycle to be performed.
*/
- public suspend fun requestCycle()
+ public fun requestCycle()
}
/**
@@ -46,9 +45,8 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
*/
public object Interactive : WorkflowSchedulerMode() {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
- yield()
- scheduler.schedule()
+ override fun requestCycle() {
+ scheduler.coroutineScope.launch { scheduler.schedule() }
}
}
@@ -62,7 +60,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
private var next: kotlinx.coroutines.Job? = null
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
+ override fun requestCycle() {
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
@@ -88,7 +86,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
private var next: kotlinx.coroutines.Job? = null
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
+ override fun requestCycle() {
if (next == null) {
val delay = random.nextInt(200).toLong()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt
deleted file mode 100644
index 4f0c269a..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.job
-
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowService
-
-/**
- * A [JobAdmissionPolicy] that limits the amount of jobs based on the average system load.
- *
- * @property limit The maximum load before stopping admission.
- */
-public data class LoadJobAdmissionPolicy(public val limit: Double) : JobAdmissionPolicy {
- override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
- override fun invoke(
- job: JobState
- ): JobAdmissionPolicy.Advice =
- if (scheduler.load < limit)
- JobAdmissionPolicy.Advice.ADMIT
- else
- JobAdmissionPolicy.Advice.STOP
- }
-
- override fun toString(): String = "Limit-Load($limit)"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
deleted file mode 100644
index a80a8c63..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.task
-
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
-
-/**
- * A [TaskEligibilityPolicy] that limits the number of active tasks in the system based on the average system load.
- */
-public data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy {
- override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
- override fun invoke(
- task: TaskState
- ): TaskEligibilityPolicy.Advice =
- if (scheduler.load < limit)
- TaskEligibilityPolicy.Advice.ADMIT
- else
- TaskEligibilityPolicy.Advice.STOP
- }
-
- override fun toString(): String = "Limit-Load($limit)"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
index 1834a4c8..4c6d2842 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
@@ -24,7 +24,7 @@
package org.opendc.workflows.workload
-import org.opendc.compute.core.image.Image
+import org.opendc.compute.api.Image
import org.opendc.core.Identity
import java.util.*
diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 2bfcba35..4207cdfd 100644
--- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -36,11 +36,12 @@ import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.simulator.SimHostProvisioner
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
+import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
@@ -80,7 +81,11 @@ internal class StageWorkflowSchedulerIntegrationTest {
// Wait for the bare metal nodes to be spawned
delay(10)
- val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000)
+ val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider())
+ val hosts = provisioner.provisionAll()
+ val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000)
+
+ hosts.forEach { compute.addHost(it) }
// Wait for the hypervisors to be spawned
delay(10)
@@ -89,7 +94,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
testScope,
clock,
tracer,
- provisioner,
+ compute.newClient(),
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
diff --git a/simulator/opendc-workflows/src/test/resources/log4j2.xml b/simulator/opendc-workflows/src/test/resources/log4j2.xml
new file mode 100644
index 00000000..70a0eacc
--- /dev/null
+++ b/simulator/opendc-workflows/src/test/resources/log4j2.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2021 AtLarge Research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN" packages="org.apache.logging.log4j.core">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="warn">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>