diff options
Diffstat (limited to 'opendc-experiments')
13 files changed, 54 insertions, 377 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt index 8fb4a938..8c1fa803 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt @@ -29,9 +29,9 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.yield -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.Task +import org.opendc.compute.api.TaskState +import org.opendc.compute.api.TaskWatcher import org.opendc.compute.failure.models.FailureModel import org.opendc.compute.service.ComputeService import org.opendc.compute.workload.VirtualMachine @@ -44,13 +44,13 @@ import kotlin.coroutines.coroutineContext import kotlin.math.max /** - * A watcher that is locked and waits for a change in the server state to unlock - * @param unlockStates determine which [ServerState] triggers an unlock. + * A watcher that is locked and waits for a change in the task state to unlock + * @param unlockStates determine which [TaskState] triggers an unlock. * Default values are TERMINATED, ERROR, and DELETED. */ -public class RunningServerWatcher : ServerWatcher { +public class RunningTaskWatcher : TaskWatcher { // TODO: make this changeable - private val unlockStates: List<ServerState> = listOf(ServerState.DELETED, ServerState.TERMINATED) + private val unlockStates: List<TaskState> = listOf(TaskState.DELETED, TaskState.TERMINATED) private val mutex: Mutex = Mutex() @@ -63,8 +63,8 @@ public class RunningServerWatcher : ServerWatcher { } override fun onStateChanged( - server: Server, - newState: ServerState, + task: Task, + newState: TaskState, ) { if (unlockStates.contains(newState)) { mutex.unlock() @@ -78,7 +78,7 @@ public class RunningServerWatcher : ServerWatcher { * @param clock The simulation clock. * @param trace The trace to simulate. * @param seed The seed to use for randomness. - * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). + * @param submitImmediately A flag to indicate that the tasks are scheduled immediately (so not at their start time). * @param failureModelSpec A failure model to use for injecting failures. */ public suspend fun ComputeService.replay( @@ -111,12 +111,12 @@ public suspend fun ComputeService.replay( val now = clock.millis() val start = entry.startTime.toEpochMilli() - // Set the simulationOffset based on the starting time of the first server + // Set the simulationOffset based on the starting time of the first task if (simulationOffset == Long.MIN_VALUE) { simulationOffset = start - now } - // Delay the server based on the startTime given by the trace. + // Delay the task based on the startTime given by the trace. if (!submitImmediately) { delay(max(0, (start - now - simulationOffset))) } @@ -135,8 +135,8 @@ public suspend fun ComputeService.replay( val meta = mutableMapOf<String, Any>("workload" to workload) launch { - val server = - client.newServer( + val task = + client.newTask( entry.name, image, client.newFlavor( @@ -148,15 +148,15 @@ public suspend fun ComputeService.replay( meta = meta, ) - val serverWatcher = RunningServerWatcher() - serverWatcher.lock() - server.watch(serverWatcher) + val taskWatcher = RunningTaskWatcher() + taskWatcher.lock() + task.watch(taskWatcher) - // Wait until the server is terminated - serverWatcher.wait() + // Wait until the task is terminated + taskWatcher.wait() - // Stop the server after reaching the end-time of the virtual machine - server.delete() + // Stop the task after reaching the end-time of the virtual machine + task.delete() } } } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 0f76d580..abd359ff 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -122,14 +122,14 @@ public fun runScenario( ) val workloadLoader = ComputeWorkloadLoader(File(scenario.workloadSpec.pathToFile)) - val vms = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed)) + val tasks = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed)) val carbonTrace = getCarbonTrace(scenario.carbonTracePath) - val startTime = Duration.ofMillis(vms.minOf { it.startTime }.toEpochMilli()) + val startTime = Duration.ofMillis(tasks.minOf { it.startTime }.toEpochMilli()) addExportModel(provisioner, serviceDomain, scenario, seed, startTime, carbonTrace, scenario.id) val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! - service.replay(timeSource, vms, failureModelSpec = scenario.failureModelSpec, seed = seed) + service.replay(timeSource, tasks, failureModelSpec = scenario.failureModelSpec, seed = seed) } } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/WorkloadSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/WorkloadSpec.kt index c104d759..956e97f1 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/WorkloadSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/WorkloadSpec.kt @@ -67,6 +67,6 @@ public enum class WorkloadTypes { */ public fun getWorkloadType(type: WorkloadTypes): ComputeWorkload { return when (type) { - WorkloadTypes.ComputeWorkload -> trace("trace").sampleByLoad(1.0) + WorkloadTypes.ComputeWorkload -> trace().sampleByLoad(1.0) } } diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt index 5ad1ecde..301d507b 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt @@ -110,16 +110,16 @@ class ScenarioIntegrationTest { "Success=${monitor.attemptsSuccess} " + "Failure=${monitor.attemptsFailure} " + "Error=${monitor.attemptsError} " + - "Pending=${monitor.serversPending} " + - "Active=${monitor.serversActive}", + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", ) // Note that these values have been verified beforehand assertAll( { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, monitor.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") }, { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, monitor.serversPending, "No VM should not be in the queue") }, + { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") }, { assertEquals(43795971955, monitor.idleTime) { "Incorrect idle time" } }, { assertEquals(2864995687, monitor.activeTime) { "Incorrect active time" } }, { assertEquals(148, monitor.stealTime) { "Incorrect steal time" } }, @@ -156,8 +156,8 @@ class ScenarioIntegrationTest { "Success=${monitor.attemptsSuccess} " + "Failure=${monitor.attemptsFailure} " + "Error=${monitor.attemptsError} " + - "Pending=${monitor.serversPending} " + - "Active=${monitor.serversActive}", + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", ) // Note that these values have been verified beforehand @@ -197,8 +197,8 @@ class ScenarioIntegrationTest { "Success=${monitor.attemptsSuccess} " + "Failure=${monitor.attemptsFailure} " + "Error=${monitor.attemptsError} " + - "Pending=${monitor.serversPending} " + - "Active=${monitor.serversActive}", + "Pending=${monitor.tasksPending} " + + "Active=${monitor.tasksActive}", ) // Note that these values have been verified beforehand @@ -265,15 +265,15 @@ class ScenarioIntegrationTest { var attemptsSuccess = 0 var attemptsFailure = 0 var attemptsError = 0 - var serversPending = 0 - var serversActive = 0 + var tasksPending = 0 + var tasksActive = 0 override fun record(reader: ServiceTableReader) { attemptsSuccess = reader.attemptsSuccess attemptsFailure = reader.attemptsFailure attemptsError = reader.attemptsError - serversPending = reader.serversPending - serversActive = reader.serversActive + tasksPending = reader.tasksPending + tasksActive = reader.tasksActive } var idleTime = 0L diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt index f10ab310..0b32b8f6 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt @@ -51,7 +51,7 @@ class ScenarioRunnerTest { Topology("topology"), Workload("bitbrains-small", trace("bitbrains-small")), OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - "active-servers" + "active-tasks" ) assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } @@ -70,7 +70,7 @@ class ScenarioRunnerTest { Topology("topology"), Workload("bitbrains-small", trace("bitbrains-small")), OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - "active-servers" + "active-tasks" ) assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/model.json b/opendc-experiments/opendc-experiments-base/src/test/resources/model.json new file mode 100644 index 00000000..91e2657f --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/model.json @@ -0,0 +1,15 @@ +{ + "version": "1.0", + "defaultSchema": "trace", + "schemas": [ + { + "name": "trace", + "type": "custom", + "factory": "org.opendc.trace.calcite.TraceSchemaFactory", + "operand": { + "path": "trace", + "format": "opendc-vm" + } + } + ] +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet Binary files differindex 9d953956..9d953956 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/trace.parquet +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet Binary files differindex 9cded35f..9cded35f 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/meta.parquet +++ b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet diff --git a/opendc-experiments/opendc-experiments-workflow/build.gradle.kts b/opendc-experiments/opendc-experiments-workflow/build.gradle.kts deleted file mode 100644 index ff5144c5..00000000 --- a/opendc-experiments/opendc-experiments-workflow/build.gradle.kts +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Support library for simulating workflows with OpenDC" - -// Build configuration -plugins { - `kotlin-library-conventions` - `testing-conventions` - `jacoco-conventions` -} - -dependencies { - api(projects.opendcWorkflow.opendcWorkflowApi) - - implementation(libs.kotlinx.coroutines) - implementation(projects.opendcCompute.opendcComputeService) - implementation(projects.opendcWorkflow.opendcWorkflowService) - implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcTrace.opendcTraceApi) - implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-simulator"))) -} diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt deleted file mode 100644 index e396901c..00000000 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("TraceHelpers") - -package org.opendc.experiments.workflow - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import org.opendc.simulator.compute.workload.SimWorkloads -import org.opendc.trace.Trace -import org.opendc.trace.conv.TABLE_TASKS -import org.opendc.trace.conv.TASK_ALLOC_NCPUS -import org.opendc.trace.conv.TASK_ID -import org.opendc.trace.conv.TASK_PARENTS -import org.opendc.trace.conv.TASK_REQ_NCPUS -import org.opendc.trace.conv.TASK_RUNTIME -import org.opendc.trace.conv.TASK_SUBMIT_TIME -import org.opendc.trace.conv.TASK_WORKFLOW_ID -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import org.opendc.workflow.service.WorkflowService -import java.time.InstantSource -import java.util.UUID -import kotlin.collections.HashMap -import kotlin.collections.HashSet -import kotlin.math.min - -/** - * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service. - */ -public fun Trace.toJobs(): List<Job> { - val table = checkNotNull(getTable(TABLE_TASKS)) - val reader = table.newReader() - - val jobs = mutableMapOf<Long, Job>() - val tasks = mutableMapOf<Long, Task>() - val taskDependencies = mutableMapOf<Task, Set<Long>>() - - try { - while (reader.nextRow()) { - // Bag of tasks without workflow ID all share the same workflow - val workflowId = if (reader.resolve(TASK_WORKFLOW_ID) != -1) reader.getString(TASK_WORKFLOW_ID)!!.toLong() else 0L - val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } - - val id = reader.getString(TASK_ID)!!.toLong() - val grantedCpus = - if (reader.resolve(TASK_ALLOC_NCPUS) != 0) { - reader.getInt(TASK_ALLOC_NCPUS) - } else { - reader.getInt(TASK_REQ_NCPUS) - } - val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!! - val runtime = reader.getDuration(TASK_RUNTIME)!! - val flops: Long = 4000 * runtime.seconds * grantedCpus - val workload = SimWorkloads.flops(flops, 1.0) - val task = - Task( - UUID(0L, id), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to grantedCpus, - WORKFLOW_TASK_DEADLINE to runtime.toMillis(), - ), - ) - - tasks[id] = task - taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet() - - (workflow.metadata as MutableMap<String, Any>).merge( - "WORKFLOW_SUBMIT_TIME", - submitTime.toEpochMilli(), - ) { a, b -> min(a as Long, b as Long) } - (workflow.tasks as MutableSet<Task>).add(task) - } - - // Resolve dependencies for all tasks - for ((task, deps) in taskDependencies) { - for (dep in deps) { - val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } - (task.dependencies as MutableSet<Task>).add(parent) - } - } - } finally { - reader.close() - } - - return jobs.values.toList() -} - -/** - * Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished. - */ -public suspend fun WorkflowService.replay( - clock: InstantSource, - jobs: List<Job>, -) { - // Sort jobs by their arrival time - val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } - if (orderedJobs.isEmpty()) { - return - } - - // Wait until the trace is started - val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long - var offset = 0L - - if (startTime != Long.MAX_VALUE) { - offset = startTime - clock.millis() - delay(offset.coerceAtLeast(0)) - } - - coroutineScope { - for (job in orderedJobs) { - val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long - if (submitTime != Long.MAX_VALUE) { - delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0)) - } - - launch { invoke(job) } - } - } -} diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt deleted file mode 100644 index cb8056a7..00000000 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.workflow - -import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy -import org.opendc.workflow.service.scheduler.job.JobOrderPolicy -import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy -import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy -import java.time.Duration - -/** - * Specification of the scheduling policies of the workflow scheduler. - */ -public data class WorkflowSchedulerSpec( - val schedulingQuantum: Duration, - val jobAdmissionPolicy: JobAdmissionPolicy, - val jobOrderPolicy: JobOrderPolicy, - val taskEligibilityPolicy: TaskEligibilityPolicy, - val taskOrderPolicy: TaskOrderPolicy, -) diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt deleted file mode 100644 index af2a4871..00000000 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.workflow - -import org.opendc.compute.service.ComputeService -import org.opendc.compute.simulator.provisioner.ProvisioningContext -import org.opendc.compute.simulator.provisioner.ProvisioningStep -import org.opendc.workflow.service.WorkflowService -import java.time.Duration - -/** - * A [ProvisioningStep] that provisions a [WorkflowService]. - * - * @param serviceDomain The domain name under which to register the workflow service. - * @param computeService The domain name where the underlying compute service is located. - * @param scheduler The configuration of the scheduler of the workflow engine. - * @param schedulingQuantum The scheduling quantum of the compute scheduler. - */ -public class WorkflowServiceProvisioningStep internal constructor( - private val serviceDomain: String, - private val computeService: String, - private val scheduler: WorkflowSchedulerSpec, - private val schedulingQuantum: Duration, -) : ProvisioningStep { - override fun apply(ctx: ProvisioningContext): AutoCloseable { - val computeService = - requireNotNull( - ctx.registry.resolve(computeService, ComputeService::class.java), - ) { "Compute service $computeService does not exist" } - - val client = computeService.newClient() - val service = - WorkflowService( - ctx.dispatcher, - client, - scheduler.schedulingQuantum, - jobAdmissionPolicy = scheduler.jobAdmissionPolicy, - jobOrderPolicy = scheduler.jobOrderPolicy, - taskEligibilityPolicy = scheduler.taskEligibilityPolicy, - taskOrderPolicy = scheduler.taskOrderPolicy, - ) - ctx.registry.register(serviceDomain, WorkflowService::class.java, service) - - return AutoCloseable { - service.close() - client.close() - } - } -} diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt deleted file mode 100644 index bfcf3734..00000000 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("WorkflowSteps") - -package org.opendc.experiments.workflow - -import org.opendc.compute.simulator.provisioner.ProvisioningStep -import org.opendc.workflow.service.WorkflowService -import java.time.Duration - -/** - * Return a [ProvisioningStep] that sets up a [WorkflowService]. - */ -public fun setupWorkflowService( - serviceDomain: String, - computeService: String, - scheduler: WorkflowSchedulerSpec, - schedulingQuantum: Duration = Duration.ofMinutes(5), -): ProvisioningStep { - return WorkflowServiceProvisioningStep(serviceDomain, computeService, scheduler, schedulingQuantum) -} |
