summaryrefslogtreecommitdiff
path: root/database/gwf_converter/gwf_converter.py
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2018-02-06 11:59:54 +0100
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2018-02-06 11:59:54 +0100
commite7880192e47115abf648310f0f48883da18c1bb0 (patch)
treee65add15228d89e119284174a1e219d976f59daa /database/gwf_converter/gwf_converter.py
parent6b078c67594fac6bf5ee0204552d6330a5fff2d6 (diff)
Add GWF conversion script
Diffstat (limited to 'database/gwf_converter/gwf_converter.py')
-rw-r--r--database/gwf_converter/gwf_converter.py99
1 files changed, 99 insertions, 0 deletions
diff --git a/database/gwf_converter/gwf_converter.py b/database/gwf_converter/gwf_converter.py
new file mode 100644
index 00000000..0745fac3
--- /dev/null
+++ b/database/gwf_converter/gwf_converter.py
@@ -0,0 +1,99 @@
+import os
+import sys
+
+import mysql.connector as mariadb
+
+
+class Job:
+ def __init__(self, gwf_id):
+ self.gwf_id = gwf_id
+ self.db_id = -1
+ self.tasks = []
+
+
+class Task:
+ def __init__(self, gwf_id, job, submit_time, run_time, num_processors, dependency_gwf_ids):
+ self.gwf_id = gwf_id
+ self.job = job
+ self.submit_time = submit_time
+ self.run_time = run_time
+ self.flops = 10 ** 9 * run_time * num_processors
+ self.dependency_gwf_ids = dependency_gwf_ids
+ self.db_id = -1
+ self.dependencies = []
+
+
+def get_jobs_from_gwf_file(file_name):
+ jobs = {}
+ tasks = {}
+
+ with open(file_name, "r") as f:
+ # Skip first CSV header line
+ f.readline()
+
+ for line in f:
+ if line.startswith("#") or len(line.strip()) == 0:
+ continue
+
+ values = [col.strip() for col in line.split(",")]
+ cast_values = [int(values[i]) for i in range(len(values) - 1)]
+ job_id, task_id, submit_time, run_time, num_processors, req_num_processors = cast_values
+ dependency_gwf_ids = [int(val) for val in values[-1].split(" ") if val != ""]
+
+ if job_id not in jobs:
+ jobs[job_id] = Job(job_id)
+
+ new_task = Task(task_id, jobs[job_id], submit_time, run_time, num_processors, dependency_gwf_ids)
+ tasks[task_id] = new_task
+ jobs[job_id].tasks.append(new_task)
+
+ for task in tasks.values():
+ for dependency_gwf_id in task.dependency_gwf_ids:
+ if dependency_gwf_id in tasks:
+ task.dependencies.append(tasks[dependency_gwf_id])
+
+ return jobs.values()
+
+
+def write_to_db(trace_name, jobs):
+ conn = mariadb.connect(user='opendc', password='opendcpassword', database='opendc')
+ cursor = conn.cursor()
+
+ trace_id = execute_insert_query(conn, cursor, "INSERT INTO traces (name) VALUES ('%s')" % trace_name)
+
+ for job in jobs:
+ job.db_id = execute_insert_query(conn, cursor, "INSERT INTO jobs (name, trace_id) VALUES ('%s',%d)"
+ % ("Job %d" % job.gwf_id, trace_id))
+
+ for task in job.tasks:
+ task.db_id = execute_insert_query(conn, cursor, "INSERT INTO tasks (start_tick, total_flop_count, job_id, "
+ "parallelizability) VALUES (%d,%d,%d,'SEQUENTIAL')"
+ % (task.submit_time, task.flops, job.db_id))
+
+ for job in jobs:
+ for task in job.tasks:
+ for dependency in task.dependencies:
+ execute_insert_query(conn, cursor, "INSERT INTO task_dependencies (first_task_id, second_task_id) "
+ "VALUES (%d,%d)"
+ % (dependency.db_id, task.db_id))
+
+ conn.close()
+
+
+def execute_insert_query(conn, cursor, sql):
+ try:
+ cursor.execute(sql)
+ except mariadb.Error as error:
+ print("SQL Error: {}".format(error))
+
+ conn.commit()
+ return cursor.lastrowid
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 2:
+ sys.exit("Usage: %s trace-name" % sys.argv[0])
+
+ gwf_trace_name = sys.argv[1]
+ gwf_jobs = get_jobs_from_gwf_file(os.path.join("traces", gwf_trace_name + ".gwf"))
+ write_to_db(gwf_trace_name, gwf_jobs)