summaryrefslogtreecommitdiff
path: root/opendc-workflow
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-09 10:31:41 +0200
committerGitHub <noreply@github.com>2022-06-09 10:31:41 +0200
commitd146814bbbb86bfcb19ccb94250424703e9179e5 (patch)
treebf20f51b434d56e60ad013568ac1a32b912a3b5e /opendc-workflow
parent61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff)
parent9d759c9bc987965fae8b0c16c000772c546bf3a2 (diff)
merge: Introduce schema for trace API (#88)
This pull request updates the OpenDC trace API to support proper specification of a schema of the tables exposed by the traces. This functionality makes it easier for the API consumer to understand the types exposed by the API. ## Implementation Notes :hammer_and_pick: * Introduce type system for trace API * Add benchmarks for odcvm trace format * Add benchmarks for Azure trace format * Add conformance suite for OpenDC trace API ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * Removal of typed `TableColumn`. Instead, `TableColumn` instances are now used to describe the columns belonging to some table. * `TableReader` and `TableWriter` do not support accessing arbitrary objects anymore. Instead, only the types supported by the type system are exposed.
Diffstat (limited to 'opendc-workflow')
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt12
1 files changed, 6 insertions, 6 deletions
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
index 3aa4463c..5f57723b 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
@@ -49,16 +49,16 @@ public fun Trace.toJobs(): List<Job> {
try {
while (reader.nextRow()) {
// Bag of tasks without workflow ID all share the same workflow
- val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L
+ val workflowId = if (reader.resolve(TASK_WORKFLOW_ID) != -1) reader.getString(TASK_WORKFLOW_ID)!!.toLong() else 0L
val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
- val id = reader.get(TASK_ID).toLong()
- val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS))
+ val id = reader.getString(TASK_ID)!!.toLong()
+ val grantedCpus = if (reader.resolve(TASK_ALLOC_NCPUS) != 0)
reader.getInt(TASK_ALLOC_NCPUS)
else
reader.getInt(TASK_REQ_NCPUS)
- val submitTime = reader.get(TASK_SUBMIT_TIME)
- val runtime = reader.get(TASK_RUNTIME)
+ val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!!
+ val runtime = reader.getDuration(TASK_RUNTIME)!!
val flops: Long = 4000 * runtime.seconds * grantedCpus
val workload = SimFlopsWorkload(flops)
val task = Task(
@@ -73,7 +73,7 @@ public fun Trace.toJobs(): List<Job> {
)
tasks[id] = task
- taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet()
+ taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet()
(workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
(workflow.tasks as MutableSet<Task>).add(task)