summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-08-27 13:48:46 +0200
committerGitHub <noreply@github.com>2024-08-27 13:48:46 +0200
commit3363df4c72a064e590ca98f8e01832cfa4e15a3f (patch)
tree9a938700fe08ce344ff5d0d475d0b64d7233d1fc /opendc-experiments
parentc21708013f2746807f5bdb3fc47c2b47ed15b7c8 (diff)
Renamed input files and internally server is changed to task (#246)
* Updated SimTrace to use a single ArrayDeque instead of three separate lists for deadline, cpuUsage, and coreCount * Renamed input files to tasks.parquet and fragments.parquet. Renamed server to task. OpenDC nows exports tasks.parquet instead of server.parquet
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt42
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt6
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/WorkloadSpec.kt2
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt24
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt4
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/resources/model.json15
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet (renamed from opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/trace.parquet)bin2163354 -> 2163354 bytes
-rw-r--r--opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet (renamed from opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/meta.parquet)bin2723 -> 2723 bytes
-rw-r--r--opendc-experiments/opendc-experiments-workflow/build.gradle.kts41
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt147
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt40
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt69
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt41
13 files changed, 54 insertions, 377 deletions
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt
index 8fb4a938..8c1fa803 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt
@@ -29,9 +29,9 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.yield
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.api.ServerWatcher
+import org.opendc.compute.api.Task
+import org.opendc.compute.api.TaskState
+import org.opendc.compute.api.TaskWatcher
import org.opendc.compute.failure.models.FailureModel
import org.opendc.compute.service.ComputeService
import org.opendc.compute.workload.VirtualMachine
@@ -44,13 +44,13 @@ import kotlin.coroutines.coroutineContext
import kotlin.math.max
/**
- * A watcher that is locked and waits for a change in the server state to unlock
- * @param unlockStates determine which [ServerState] triggers an unlock.
+ * A watcher that is locked and waits for a change in the task state to unlock
+ * @param unlockStates determine which [TaskState] triggers an unlock.
* Default values are TERMINATED, ERROR, and DELETED.
*/
-public class RunningServerWatcher : ServerWatcher {
+public class RunningTaskWatcher : TaskWatcher {
// TODO: make this changeable
- private val unlockStates: List<ServerState> = listOf(ServerState.DELETED, ServerState.TERMINATED)
+ private val unlockStates: List<TaskState> = listOf(TaskState.DELETED, TaskState.TERMINATED)
private val mutex: Mutex = Mutex()
@@ -63,8 +63,8 @@ public class RunningServerWatcher : ServerWatcher {
}
override fun onStateChanged(
- server: Server,
- newState: ServerState,
+ task: Task,
+ newState: TaskState,
) {
if (unlockStates.contains(newState)) {
mutex.unlock()
@@ -78,7 +78,7 @@ public class RunningServerWatcher : ServerWatcher {
* @param clock The simulation clock.
* @param trace The trace to simulate.
* @param seed The seed to use for randomness.
- * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
+ * @param submitImmediately A flag to indicate that the tasks are scheduled immediately (so not at their start time).
* @param failureModelSpec A failure model to use for injecting failures.
*/
public suspend fun ComputeService.replay(
@@ -111,12 +111,12 @@ public suspend fun ComputeService.replay(
val now = clock.millis()
val start = entry.startTime.toEpochMilli()
- // Set the simulationOffset based on the starting time of the first server
+ // Set the simulationOffset based on the starting time of the first task
if (simulationOffset == Long.MIN_VALUE) {
simulationOffset = start - now
}
- // Delay the server based on the startTime given by the trace.
+ // Delay the task based on the startTime given by the trace.
if (!submitImmediately) {
delay(max(0, (start - now - simulationOffset)))
}
@@ -135,8 +135,8 @@ public suspend fun ComputeService.replay(
val meta = mutableMapOf<String, Any>("workload" to workload)
launch {
- val server =
- client.newServer(
+ val task =
+ client.newTask(
entry.name,
image,
client.newFlavor(
@@ -148,15 +148,15 @@ public suspend fun ComputeService.replay(
meta = meta,
)
- val serverWatcher = RunningServerWatcher()
- serverWatcher.lock()
- server.watch(serverWatcher)
+ val taskWatcher = RunningTaskWatcher()
+ taskWatcher.lock()
+ task.watch(taskWatcher)
- // Wait until the server is terminated
- serverWatcher.wait()
+ // Wait until the task is terminated
+ taskWatcher.wait()
- // Stop the server after reaching the end-time of the virtual machine
- server.delete()
+ // Stop the task after reaching the end-time of the virtual machine
+ task.delete()
}
}
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
index 0f76d580..abd359ff 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt
@@ -122,14 +122,14 @@ public fun runScenario(
)
val workloadLoader = ComputeWorkloadLoader(File(scenario.workloadSpec.pathToFile))
- val vms = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed))
+ val tasks = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed))
val carbonTrace = getCarbonTrace(scenario.carbonTracePath)
- val startTime = Duration.ofMillis(vms.minOf { it.startTime }.toEpochMilli())
+ val startTime = Duration.ofMillis(tasks.minOf { it.startTime }.toEpochMilli())
addExportModel(provisioner, serviceDomain, scenario, seed, startTime, carbonTrace, scenario.id)
val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
- service.replay(timeSource, vms, failureModelSpec = scenario.failureModelSpec, seed = seed)
+ service.replay(timeSource, tasks, failureModelSpec = scenario.failureModelSpec, seed = seed)
}
}
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/WorkloadSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/WorkloadSpec.kt
index c104d759..956e97f1 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/WorkloadSpec.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/scenario/specs/WorkloadSpec.kt
@@ -67,6 +67,6 @@ public enum class WorkloadTypes {
*/
public fun getWorkloadType(type: WorkloadTypes): ComputeWorkload {
return when (type) {
- WorkloadTypes.ComputeWorkload -> trace("trace").sampleByLoad(1.0)
+ WorkloadTypes.ComputeWorkload -> trace().sampleByLoad(1.0)
}
}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
index 5ad1ecde..301d507b 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioIntegrationTest.kt
@@ -110,16 +110,16 @@ class ScenarioIntegrationTest {
"Success=${monitor.attemptsSuccess} " +
"Failure=${monitor.attemptsFailure} " +
"Error=${monitor.attemptsError} " +
- "Pending=${monitor.serversPending} " +
- "Active=${monitor.serversActive}",
+ "Pending=${monitor.tasksPending} " +
+ "Active=${monitor.tasksActive}",
)
// Note that these values have been verified beforehand
assertAll(
{ assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") },
- { assertEquals(0, monitor.serversActive, "All VMs should finish after a run") },
+ { assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
{ assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
- { assertEquals(0, monitor.serversPending, "No VM should not be in the queue") },
+ { assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
{ assertEquals(43795971955, monitor.idleTime) { "Incorrect idle time" } },
{ assertEquals(2864995687, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(148, monitor.stealTime) { "Incorrect steal time" } },
@@ -156,8 +156,8 @@ class ScenarioIntegrationTest {
"Success=${monitor.attemptsSuccess} " +
"Failure=${monitor.attemptsFailure} " +
"Error=${monitor.attemptsError} " +
- "Pending=${monitor.serversPending} " +
- "Active=${monitor.serversActive}",
+ "Pending=${monitor.tasksPending} " +
+ "Active=${monitor.tasksActive}",
)
// Note that these values have been verified beforehand
@@ -197,8 +197,8 @@ class ScenarioIntegrationTest {
"Success=${monitor.attemptsSuccess} " +
"Failure=${monitor.attemptsFailure} " +
"Error=${monitor.attemptsError} " +
- "Pending=${monitor.serversPending} " +
- "Active=${monitor.serversActive}",
+ "Pending=${monitor.tasksPending} " +
+ "Active=${monitor.tasksActive}",
)
// Note that these values have been verified beforehand
@@ -265,15 +265,15 @@ class ScenarioIntegrationTest {
var attemptsSuccess = 0
var attemptsFailure = 0
var attemptsError = 0
- var serversPending = 0
- var serversActive = 0
+ var tasksPending = 0
+ var tasksActive = 0
override fun record(reader: ServiceTableReader) {
attemptsSuccess = reader.attemptsSuccess
attemptsFailure = reader.attemptsFailure
attemptsError = reader.attemptsError
- serversPending = reader.serversPending
- serversActive = reader.serversActive
+ tasksPending = reader.tasksPending
+ tasksActive = reader.tasksActive
}
var idleTime = 0L
diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt
index f10ab310..0b32b8f6 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt
+++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ScenarioRunnerTest.kt
@@ -51,7 +51,7 @@ class ScenarioRunnerTest {
Topology("topology"),
Workload("bitbrains-small", trace("bitbrains-small")),
OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true),
- "active-servers"
+ "active-tasks"
)
assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) }
@@ -70,7 +70,7 @@ class ScenarioRunnerTest {
Topology("topology"),
Workload("bitbrains-small", trace("bitbrains-small")),
OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true),
- "active-servers"
+ "active-tasks"
)
assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) }
diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/model.json b/opendc-experiments/opendc-experiments-base/src/test/resources/model.json
new file mode 100644
index 00000000..91e2657f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-base/src/test/resources/model.json
@@ -0,0 +1,15 @@
+{
+ "version": "1.0",
+ "defaultSchema": "trace",
+ "schemas": [
+ {
+ "name": "trace",
+ "type": "custom",
+ "factory": "org.opendc.trace.calcite.TraceSchemaFactory",
+ "operand": {
+ "path": "trace",
+ "format": "opendc-vm"
+ }
+ }
+ ]
+}
diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet
index 9d953956..9d953956 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/trace.parquet
+++ b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/fragments.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet
index 9cded35f..9cded35f 100644
--- a/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/meta.parquet
+++ b/opendc-experiments/opendc-experiments-base/src/test/resources/trace/bitbrains-small/tasks.parquet
Binary files differ
diff --git a/opendc-experiments/opendc-experiments-workflow/build.gradle.kts b/opendc-experiments/opendc-experiments-workflow/build.gradle.kts
deleted file mode 100644
index ff5144c5..00000000
--- a/opendc-experiments/opendc-experiments-workflow/build.gradle.kts
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-description = "Support library for simulating workflows with OpenDC"
-
-// Build configuration
-plugins {
- `kotlin-library-conventions`
- `testing-conventions`
- `jacoco-conventions`
-}
-
-dependencies {
- api(projects.opendcWorkflow.opendcWorkflowApi)
-
- implementation(libs.kotlinx.coroutines)
- implementation(projects.opendcCompute.opendcComputeService)
- implementation(projects.opendcWorkflow.opendcWorkflowService)
- implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcTrace.opendcTraceApi)
- implementation(project(mapOf("path" to ":opendc-compute:opendc-compute-simulator")))
-}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
deleted file mode 100644
index e396901c..00000000
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("TraceHelpers")
-
-package org.opendc.experiments.workflow
-
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import org.opendc.simulator.compute.workload.SimWorkloads
-import org.opendc.trace.Trace
-import org.opendc.trace.conv.TABLE_TASKS
-import org.opendc.trace.conv.TASK_ALLOC_NCPUS
-import org.opendc.trace.conv.TASK_ID
-import org.opendc.trace.conv.TASK_PARENTS
-import org.opendc.trace.conv.TASK_REQ_NCPUS
-import org.opendc.trace.conv.TASK_RUNTIME
-import org.opendc.trace.conv.TASK_SUBMIT_TIME
-import org.opendc.trace.conv.TASK_WORKFLOW_ID
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
-import org.opendc.workflow.service.WorkflowService
-import java.time.InstantSource
-import java.util.UUID
-import kotlin.collections.HashMap
-import kotlin.collections.HashSet
-import kotlin.math.min
-
-/**
- * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service.
- */
-public fun Trace.toJobs(): List<Job> {
- val table = checkNotNull(getTable(TABLE_TASKS))
- val reader = table.newReader()
-
- val jobs = mutableMapOf<Long, Job>()
- val tasks = mutableMapOf<Long, Task>()
- val taskDependencies = mutableMapOf<Task, Set<Long>>()
-
- try {
- while (reader.nextRow()) {
- // Bag of tasks without workflow ID all share the same workflow
- val workflowId = if (reader.resolve(TASK_WORKFLOW_ID) != -1) reader.getString(TASK_WORKFLOW_ID)!!.toLong() else 0L
- val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
-
- val id = reader.getString(TASK_ID)!!.toLong()
- val grantedCpus =
- if (reader.resolve(TASK_ALLOC_NCPUS) != 0) {
- reader.getInt(TASK_ALLOC_NCPUS)
- } else {
- reader.getInt(TASK_REQ_NCPUS)
- }
- val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!!
- val runtime = reader.getDuration(TASK_RUNTIME)!!
- val flops: Long = 4000 * runtime.seconds * grantedCpus
- val workload = SimWorkloads.flops(flops, 1.0)
- val task =
- Task(
- UUID(0L, id),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to grantedCpus,
- WORKFLOW_TASK_DEADLINE to runtime.toMillis(),
- ),
- )
-
- tasks[id] = task
- taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet()
-
- (workflow.metadata as MutableMap<String, Any>).merge(
- "WORKFLOW_SUBMIT_TIME",
- submitTime.toEpochMilli(),
- ) { a, b -> min(a as Long, b as Long) }
- (workflow.tasks as MutableSet<Task>).add(task)
- }
-
- // Resolve dependencies for all tasks
- for ((task, deps) in taskDependencies) {
- for (dep in deps) {
- val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" }
- (task.dependencies as MutableSet<Task>).add(parent)
- }
- }
- } finally {
- reader.close()
- }
-
- return jobs.values.toList()
-}
-
-/**
- * Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished.
- */
-public suspend fun WorkflowService.replay(
- clock: InstantSource,
- jobs: List<Job>,
-) {
- // Sort jobs by their arrival time
- val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long }
- if (orderedJobs.isEmpty()) {
- return
- }
-
- // Wait until the trace is started
- val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
- var offset = 0L
-
- if (startTime != Long.MAX_VALUE) {
- offset = startTime - clock.millis()
- delay(offset.coerceAtLeast(0))
- }
-
- coroutineScope {
- for (job in orderedJobs) {
- val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
- if (submitTime != Long.MAX_VALUE) {
- delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0))
- }
-
- launch { invoke(job) }
- }
- }
-}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
deleted file mode 100644
index cb8056a7..00000000
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.workflow
-
-import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
-import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
-import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
-import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
-import java.time.Duration
-
-/**
- * Specification of the scheduling policies of the workflow scheduler.
- */
-public data class WorkflowSchedulerSpec(
- val schedulingQuantum: Duration,
- val jobAdmissionPolicy: JobAdmissionPolicy,
- val jobOrderPolicy: JobOrderPolicy,
- val taskEligibilityPolicy: TaskEligibilityPolicy,
- val taskOrderPolicy: TaskOrderPolicy,
-)
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
deleted file mode 100644
index af2a4871..00000000
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.experiments.workflow
-
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.simulator.provisioner.ProvisioningContext
-import org.opendc.compute.simulator.provisioner.ProvisioningStep
-import org.opendc.workflow.service.WorkflowService
-import java.time.Duration
-
-/**
- * A [ProvisioningStep] that provisions a [WorkflowService].
- *
- * @param serviceDomain The domain name under which to register the workflow service.
- * @param computeService The domain name where the underlying compute service is located.
- * @param scheduler The configuration of the scheduler of the workflow engine.
- * @param schedulingQuantum The scheduling quantum of the compute scheduler.
- */
-public class WorkflowServiceProvisioningStep internal constructor(
- private val serviceDomain: String,
- private val computeService: String,
- private val scheduler: WorkflowSchedulerSpec,
- private val schedulingQuantum: Duration,
-) : ProvisioningStep {
- override fun apply(ctx: ProvisioningContext): AutoCloseable {
- val computeService =
- requireNotNull(
- ctx.registry.resolve(computeService, ComputeService::class.java),
- ) { "Compute service $computeService does not exist" }
-
- val client = computeService.newClient()
- val service =
- WorkflowService(
- ctx.dispatcher,
- client,
- scheduler.schedulingQuantum,
- jobAdmissionPolicy = scheduler.jobAdmissionPolicy,
- jobOrderPolicy = scheduler.jobOrderPolicy,
- taskEligibilityPolicy = scheduler.taskEligibilityPolicy,
- taskOrderPolicy = scheduler.taskOrderPolicy,
- )
- ctx.registry.register(serviceDomain, WorkflowService::class.java, service)
-
- return AutoCloseable {
- service.close()
- client.close()
- }
- }
-}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
deleted file mode 100644
index bfcf3734..00000000
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("WorkflowSteps")
-
-package org.opendc.experiments.workflow
-
-import org.opendc.compute.simulator.provisioner.ProvisioningStep
-import org.opendc.workflow.service.WorkflowService
-import java.time.Duration
-
-/**
- * Return a [ProvisioningStep] that sets up a [WorkflowService].
- */
-public fun setupWorkflowService(
- serviceDomain: String,
- computeService: String,
- scheduler: WorkflowSchedulerSpec,
- schedulingQuantum: Duration = Duration.ofMinutes(5),
-): ProvisioningStep {
- return WorkflowServiceProvisioningStep(serviceDomain, computeService, scheduler, schedulingQuantum)
-}