diff options
| author | Georgios Andreadis <info@gandreadis.com> | 2020-06-30 14:40:18 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-08-24 19:43:48 +0200 |
| commit | 8ede2b8597eb14c0567c2c3c0135593f7c557521 (patch) | |
| tree | b54bb8f8fc6ba93bda3e3f127191bde96450c0ec /database/gwf_converter | |
| parent | 4c8a21da65e9a02437c03dcdb8aab6b054b651b0 (diff) | |
Continue cleaning up
Diffstat (limited to 'database/gwf_converter')
| -rw-r--r-- | database/gwf_converter/gwf_converter.py | 115 | ||||
| -rw-r--r-- | database/gwf_converter/requirements.txt | 1 | ||||
| -rw-r--r-- | database/gwf_converter/traces/default.gwf | 6 |
3 files changed, 0 insertions, 122 deletions
diff --git a/database/gwf_converter/gwf_converter.py b/database/gwf_converter/gwf_converter.py deleted file mode 100644 index 902bd93f..00000000 --- a/database/gwf_converter/gwf_converter.py +++ /dev/null @@ -1,115 +0,0 @@ -import os -import sys - -import mysql.connector as mariadb - - -class Job: - def __init__(self, gwf_id): - self.gwf_id = gwf_id - self.db_id = -1 - self.tasks = [] - - -class Task: - def __init__(self, gwf_id, job, submit_time, run_time, num_processors, dependency_gwf_ids): - self.gwf_id = gwf_id - self.job = job - self.submit_time = submit_time - self.run_time = run_time - self.cores = num_processors - self.flops = 4000 * run_time * num_processors - self.dependency_gwf_ids = dependency_gwf_ids - self.db_id = -1 - self.dependencies = [] - - -def get_jobs_from_gwf_file(file_name): - jobs = {} - tasks = {} - - with open(file_name, "r") as f: - # Skip first CSV header line - f.readline() - - for line in f: - if line.startswith("#") or len(line.strip()) == 0: - continue - - values = [col.strip() for col in line.split(",")] - cast_values = [int(values[i]) for i in range(len(values) - 1)] - job_id, task_id, submit_time, run_time, num_processors, req_num_processors = cast_values - dependency_gwf_ids = [int(val) for val in values[-1].split(" ") if val != ""] - - if job_id not in jobs: - jobs[job_id] = Job(job_id) - - new_task = Task(task_id, jobs[job_id], submit_time, run_time, num_processors, dependency_gwf_ids) - tasks[task_id] = new_task - jobs[job_id].tasks.append(new_task) - - for task in tasks.values(): - for dependency_gwf_id in task.dependency_gwf_ids: - if dependency_gwf_id in tasks: - task.dependencies.append(tasks[dependency_gwf_id]) - - return jobs.values() - - -def write_to_db(conn, trace_name, jobs): - cursor = conn.cursor() - - trace_id = execute_insert_query(conn, cursor, "INSERT INTO traces (name) VALUES ('%s')" % trace_name) - - for job in jobs: - job.db_id = execute_insert_query(conn, cursor, "INSERT INTO jobs (name, trace_id) VALUES ('%s',%d)" - % ("Job %d" % job.gwf_id, trace_id)) - - for task in job.tasks: - task.db_id = execute_insert_query(conn, cursor, - "INSERT INTO tasks (start_tick, total_flop_count, core_count, job_id) " - "VALUES (%d,%d,%d,%d)" - % (task.submit_time, task.flops, task.cores, job.db_id)) - - for job in jobs: - for task in job.tasks: - for dependency in task.dependencies: - execute_insert_query(conn, cursor, "INSERT INTO task_dependencies (first_task_id, second_task_id) " - "VALUES (%d,%d)" - % (dependency.db_id, task.db_id)) - -def execute_insert_query(conn, cursor, sql): - try: - cursor.execute(sql) - except mariadb.Error as error: - print("SQL Error: {}".format(error)) - - conn.commit() - return cursor.lastrowid - - -def main(trace_path): - trace_name = sys.argv[2] if (len(sys.argv) > 2) else \ - os.path.splitext(os.path.basename(trace_path))[0] - gwf_jobs = get_jobs_from_gwf_file(trace_path) - - host = os.environ.get('PERSISTENCE_HOST','localhost') - user = os.environ.get('PERSISTENCE_USER','opendc') - password = os.environ.get('PERSISTENCE_PASSWORD','opendcpassword') - database = os.environ.get('PERSISTENCE_DATABASE','opendc') - conn = mariadb.connect(host=host, user=user, password=password, database=database) - write_to_db(conn, trace_name, gwf_jobs) - conn.close() - - -if __name__ == "__main__": - if len(sys.argv) < 2: - sys.exit("Usage: %s file [name]" % sys.argv[0]) - - if sys.argv[1] in ("-a", "--all"): - for f in os.listdir("traces"): - if f.endswith(".gwf"): - print("Converting {}".format(f)) - main(os.path.join("traces", f)) - else: - main(sys.argv[1]) diff --git a/database/gwf_converter/requirements.txt b/database/gwf_converter/requirements.txt deleted file mode 100644 index 0eaebf12..00000000 --- a/database/gwf_converter/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -mysql diff --git a/database/gwf_converter/traces/default.gwf b/database/gwf_converter/traces/default.gwf deleted file mode 100644 index b1c55a17..00000000 --- a/database/gwf_converter/traces/default.gwf +++ /dev/null @@ -1,6 +0,0 @@ -WorkflowID, JobID , SubmitTime , RunTime , NProcs , ReqNProcs , Dependencies -0 , 1 , 1 , 1 , 1 , 1, 5 4 3 -0 , 2 , 2 , 2 , 2 , 2, 3 -0 , 3 , 3 , 3 , 3 , 3, 5 -0 , 4 , 4 , 4 , 4 , 4, -0 , 5 , 5 , 5 , 5 , 5, |
