diff options
| author | Georgios Andreadis <info@gandreadis.com> | 2020-06-29 16:06:35 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-08-24 16:04:00 +0200 |
| commit | 4a79cefdf5d71715b6c575d5c8bb4fea418c2ba6 (patch) | |
| tree | fc68847d1e010e2962dac8345a0fd0cc9a2f0681 /core/database/gwf_converter | |
| parent | ad31b66503ec65e611ab96c2a540180ed25f5a6f (diff) | |
Prepare opendc repository for monorepo
This change prepares the opendc repository for a monorepo setup by
moving all files to the core/ directory. After all repositories have
been merged into this repository, we will move the correct files back.
Diffstat (limited to 'core/database/gwf_converter')
| -rw-r--r-- | core/database/gwf_converter/gwf_converter.py | 115 | ||||
| -rw-r--r-- | core/database/gwf_converter/requirements.txt | 1 | ||||
| -rw-r--r-- | core/database/gwf_converter/traces/default.gwf | 6 |
3 files changed, 122 insertions, 0 deletions
diff --git a/core/database/gwf_converter/gwf_converter.py b/core/database/gwf_converter/gwf_converter.py new file mode 100644 index 00000000..902bd93f --- /dev/null +++ b/core/database/gwf_converter/gwf_converter.py @@ -0,0 +1,115 @@ +import os +import sys + +import mysql.connector as mariadb + + +class Job: + def __init__(self, gwf_id): + self.gwf_id = gwf_id + self.db_id = -1 + self.tasks = [] + + +class Task: + def __init__(self, gwf_id, job, submit_time, run_time, num_processors, dependency_gwf_ids): + self.gwf_id = gwf_id + self.job = job + self.submit_time = submit_time + self.run_time = run_time + self.cores = num_processors + self.flops = 4000 * run_time * num_processors + self.dependency_gwf_ids = dependency_gwf_ids + self.db_id = -1 + self.dependencies = [] + + +def get_jobs_from_gwf_file(file_name): + jobs = {} + tasks = {} + + with open(file_name, "r") as f: + # Skip first CSV header line + f.readline() + + for line in f: + if line.startswith("#") or len(line.strip()) == 0: + continue + + values = [col.strip() for col in line.split(",")] + cast_values = [int(values[i]) for i in range(len(values) - 1)] + job_id, task_id, submit_time, run_time, num_processors, req_num_processors = cast_values + dependency_gwf_ids = [int(val) for val in values[-1].split(" ") if val != ""] + + if job_id not in jobs: + jobs[job_id] = Job(job_id) + + new_task = Task(task_id, jobs[job_id], submit_time, run_time, num_processors, dependency_gwf_ids) + tasks[task_id] = new_task + jobs[job_id].tasks.append(new_task) + + for task in tasks.values(): + for dependency_gwf_id in task.dependency_gwf_ids: + if dependency_gwf_id in tasks: + task.dependencies.append(tasks[dependency_gwf_id]) + + return jobs.values() + + +def write_to_db(conn, trace_name, jobs): + cursor = conn.cursor() + + trace_id = execute_insert_query(conn, cursor, "INSERT INTO traces (name) VALUES ('%s')" % trace_name) + + for job in jobs: + job.db_id = execute_insert_query(conn, cursor, "INSERT INTO jobs (name, trace_id) VALUES ('%s',%d)" + % ("Job %d" % job.gwf_id, trace_id)) + + for task in job.tasks: + task.db_id = execute_insert_query(conn, cursor, + "INSERT INTO tasks (start_tick, total_flop_count, core_count, job_id) " + "VALUES (%d,%d,%d,%d)" + % (task.submit_time, task.flops, task.cores, job.db_id)) + + for job in jobs: + for task in job.tasks: + for dependency in task.dependencies: + execute_insert_query(conn, cursor, "INSERT INTO task_dependencies (first_task_id, second_task_id) " + "VALUES (%d,%d)" + % (dependency.db_id, task.db_id)) + +def execute_insert_query(conn, cursor, sql): + try: + cursor.execute(sql) + except mariadb.Error as error: + print("SQL Error: {}".format(error)) + + conn.commit() + return cursor.lastrowid + + +def main(trace_path): + trace_name = sys.argv[2] if (len(sys.argv) > 2) else \ + os.path.splitext(os.path.basename(trace_path))[0] + gwf_jobs = get_jobs_from_gwf_file(trace_path) + + host = os.environ.get('PERSISTENCE_HOST','localhost') + user = os.environ.get('PERSISTENCE_USER','opendc') + password = os.environ.get('PERSISTENCE_PASSWORD','opendcpassword') + database = os.environ.get('PERSISTENCE_DATABASE','opendc') + conn = mariadb.connect(host=host, user=user, password=password, database=database) + write_to_db(conn, trace_name, gwf_jobs) + conn.close() + + +if __name__ == "__main__": + if len(sys.argv) < 2: + sys.exit("Usage: %s file [name]" % sys.argv[0]) + + if sys.argv[1] in ("-a", "--all"): + for f in os.listdir("traces"): + if f.endswith(".gwf"): + print("Converting {}".format(f)) + main(os.path.join("traces", f)) + else: + main(sys.argv[1]) diff --git a/core/database/gwf_converter/requirements.txt b/core/database/gwf_converter/requirements.txt new file mode 100644 index 00000000..0eaebf12 --- /dev/null +++ b/core/database/gwf_converter/requirements.txt @@ -0,0 +1 @@ +mysql diff --git a/core/database/gwf_converter/traces/default.gwf b/core/database/gwf_converter/traces/default.gwf new file mode 100644 index 00000000..b1c55a17 --- /dev/null +++ b/core/database/gwf_converter/traces/default.gwf @@ -0,0 +1,6 @@ +WorkflowID, JobID , SubmitTime , RunTime , NProcs , ReqNProcs , Dependencies +0 , 1 , 1 , 1 , 1 , 1, 5 4 3 +0 , 2 , 2 , 2 , 2 , 2, 3 +0 , 3 , 3 , 3 , 3 , 3, 5 +0 , 4 , 4 , 4 , 4 , 4, +0 , 5 , 5 , 5 , 5 , 5, |
