From 6b078c67594fac6bf5ee0204552d6330a5fff2d6 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Mon, 5 Feb 2018 10:16:01 +0100 Subject: Implement workflow task dependencies --- database/schema.sql | 32 +++++++++++++++++------- database/test.sql | 70 +++++++++++++++++++++++++++-------------------------- 2 files changed, 59 insertions(+), 43 deletions(-) (limited to 'database') diff --git a/database/schema.sql b/database/schema.sql index 7f0d5879..89ba841e 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -63,7 +63,7 @@ INSERT INTO authorization_levels (level) VALUES ('VIEW'); * - DD is the two-digit day of the month (1-31) * - HH is the two-digit hours part (0-23) * - MM is the two-digit minutes part (0-59) -* - SS is the two-digit secodns part (0-59) +* - SS is the two-digit seconds part (0-59) */ -- Simulation @@ -180,13 +180,24 @@ CREATE TABLE tasks ( start_tick INTEGER NOT NULL CHECK (start_tick >= 0), total_flop_count INTEGER NOT NULL, job_id INTEGER NOT NULL, - task_dependency_id INTEGER NULL, parallelizability VARCHAR(50) NOT NULL, FOREIGN KEY (job_id) REFERENCES jobs (id) + ON DELETE CASCADE + ON UPDATE CASCADE +); + +-- A dependency between two tasks. +DROP TABLE IF EXISTS task_dependencies; +CREATE TABLE task_dependencies ( + first_task_id INTEGER NOT NULL, + second_task_id INTEGER NOT NULL, + + PRIMARY KEY (first_task_id, second_task_id), + FOREIGN KEY (first_task_id) REFERENCES tasks (id) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (task_dependency_id) REFERENCES tasks (id) + FOREIGN KEY (second_task_id) REFERENCES tasks (id) ON DELETE CASCADE ON UPDATE CASCADE ); @@ -318,8 +329,9 @@ DELIMITER // -- and tiles in a room are connected. DROP TRIGGER IF EXISTS before_insert_tiles_check_existence; CREATE TRIGGER before_insert_tiles_check_existence -BEFORE INSERT ON tiles -FOR EACH ROW + BEFORE INSERT + ON tiles + FOR EACH ROW BEGIN -- checking tile overlap -- a tile already exists such that.. @@ -416,8 +428,9 @@ DELIMITER // -- Make sure objects are added to tiles in rooms they're allowed to be in. DROP TRIGGER IF EXISTS before_update_tiles; CREATE TRIGGER before_update_tiles -BEFORE UPDATE ON tiles -FOR EACH ROW + BEFORE UPDATE + ON tiles + FOR EACH ROW BEGIN IF ((NEW.object_id IS NOT NULL) AND ( @@ -543,8 +556,9 @@ DELIMITER // -- Make sure a machine is not inserted at a position that does not exist for its rack. DROP TRIGGER IF EXISTS before_insert_machine; CREATE TRIGGER before_insert_machine -BEFORE INSERT ON machines -FOR EACH ROW + BEFORE INSERT + ON machines + FOR EACH ROW BEGIN IF ( NEW.position > (SELECT capacity diff --git a/database/test.sql b/database/test.sql index fa7fb8aa..452544e0 100644 --- a/database/test.sql +++ b/database/test.sql @@ -43,8 +43,9 @@ INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALU INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (25, 10000, 1, 'PARALLEL'); INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (25, 10000, 1, 'PARALLEL'); INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (26, 10000, 1, 'PARALLEL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (80, 200000, 1, 1, 'PARALLEL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (80, 200000, 1, 'PARALLEL'); + +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (1, 5); -- Image Processing Trace INSERT INTO traces (name) VALUES ('Image Processing'); @@ -121,38 +122,39 @@ INSERT INTO traces (name) VALUES ('Path planning'); INSERT INTO jobs (name, trace_id) VALUES ('Path planning', 3); INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (0, 1000000, 3, 'PARALLEL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (11, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (12, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (13, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (14, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (11, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (12, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (13, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (14, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (11, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (12, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (13, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (14, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (11, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (12, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (13, 200000, 3, 66, 'SEQUENTIAL'); -INSERT INTO tasks (start_tick, total_flop_count, job_id, task_dependency_id, parallelizability) -VALUES (14, 200000, 3, 66, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (11, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (12, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (13, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (14, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (11, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (12, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (13, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (14, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (11, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (12, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (13, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (14, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (11, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (12, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (13, 200000, 3, 'SEQUENTIAL'); +INSERT INTO tasks (start_tick, total_flop_count, job_id, parallelizability) VALUES (14, 200000, 3, 'SEQUENTIAL'); + +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 67); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 68); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 69); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 70); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 71); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 72); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 73); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 74); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 75); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 76); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 77); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 78); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 79); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 80); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 81); +INSERT INTO task_dependencies (first_task_id, second_task_id) VALUES (66, 82); -- Parallelizable Trace INSERT INTO traces (name) VALUES ('Parallel heavy trace'); -- cgit v1.2.3 From e7880192e47115abf648310f0f48883da18c1bb0 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 6 Feb 2018 11:59:54 +0100 Subject: Add GWF conversion script --- database/gwf_converter/gwf_converter.py | 99 +++++++++++++++++++++++++++++++ database/gwf_converter/requirements.txt | 1 + database/gwf_converter/traces/default.gwf | 6 ++ database/schema.sql | 10 ++-- 4 files changed, 111 insertions(+), 5 deletions(-) create mode 100644 database/gwf_converter/gwf_converter.py create mode 100644 database/gwf_converter/requirements.txt create mode 100644 database/gwf_converter/traces/default.gwf (limited to 'database') diff --git a/database/gwf_converter/gwf_converter.py b/database/gwf_converter/gwf_converter.py new file mode 100644 index 00000000..0745fac3 --- /dev/null +++ b/database/gwf_converter/gwf_converter.py @@ -0,0 +1,99 @@ +import os +import sys + +import mysql.connector as mariadb + + +class Job: + def __init__(self, gwf_id): + self.gwf_id = gwf_id + self.db_id = -1 + self.tasks = [] + + +class Task: + def __init__(self, gwf_id, job, submit_time, run_time, num_processors, dependency_gwf_ids): + self.gwf_id = gwf_id + self.job = job + self.submit_time = submit_time + self.run_time = run_time + self.flops = 10 ** 9 * run_time * num_processors + self.dependency_gwf_ids = dependency_gwf_ids + self.db_id = -1 + self.dependencies = [] + + +def get_jobs_from_gwf_file(file_name): + jobs = {} + tasks = {} + + with open(file_name, "r") as f: + # Skip first CSV header line + f.readline() + + for line in f: + if line.startswith("#") or len(line.strip()) == 0: + continue + + values = [col.strip() for col in line.split(",")] + cast_values = [int(values[i]) for i in range(len(values) - 1)] + job_id, task_id, submit_time, run_time, num_processors, req_num_processors = cast_values + dependency_gwf_ids = [int(val) for val in values[-1].split(" ") if val != ""] + + if job_id not in jobs: + jobs[job_id] = Job(job_id) + + new_task = Task(task_id, jobs[job_id], submit_time, run_time, num_processors, dependency_gwf_ids) + tasks[task_id] = new_task + jobs[job_id].tasks.append(new_task) + + for task in tasks.values(): + for dependency_gwf_id in task.dependency_gwf_ids: + if dependency_gwf_id in tasks: + task.dependencies.append(tasks[dependency_gwf_id]) + + return jobs.values() + + +def write_to_db(trace_name, jobs): + conn = mariadb.connect(user='opendc', password='opendcpassword', database='opendc') + cursor = conn.cursor() + + trace_id = execute_insert_query(conn, cursor, "INSERT INTO traces (name) VALUES ('%s')" % trace_name) + + for job in jobs: + job.db_id = execute_insert_query(conn, cursor, "INSERT INTO jobs (name, trace_id) VALUES ('%s',%d)" + % ("Job %d" % job.gwf_id, trace_id)) + + for task in job.tasks: + task.db_id = execute_insert_query(conn, cursor, "INSERT INTO tasks (start_tick, total_flop_count, job_id, " + "parallelizability) VALUES (%d,%d,%d,'SEQUENTIAL')" + % (task.submit_time, task.flops, job.db_id)) + + for job in jobs: + for task in job.tasks: + for dependency in task.dependencies: + execute_insert_query(conn, cursor, "INSERT INTO task_dependencies (first_task_id, second_task_id) " + "VALUES (%d,%d)" + % (dependency.db_id, task.db_id)) + + conn.close() + + +def execute_insert_query(conn, cursor, sql): + try: + cursor.execute(sql) + except mariadb.Error as error: + print("SQL Error: {}".format(error)) + + conn.commit() + return cursor.lastrowid + + +if __name__ == "__main__": + if len(sys.argv) < 2: + sys.exit("Usage: %s trace-name" % sys.argv[0]) + + gwf_trace_name = sys.argv[1] + gwf_jobs = get_jobs_from_gwf_file(os.path.join("traces", gwf_trace_name + ".gwf")) + write_to_db(gwf_trace_name, gwf_jobs) diff --git a/database/gwf_converter/requirements.txt b/database/gwf_converter/requirements.txt new file mode 100644 index 00000000..0eaebf12 --- /dev/null +++ b/database/gwf_converter/requirements.txt @@ -0,0 +1 @@ +mysql diff --git a/database/gwf_converter/traces/default.gwf b/database/gwf_converter/traces/default.gwf new file mode 100644 index 00000000..b1c55a17 --- /dev/null +++ b/database/gwf_converter/traces/default.gwf @@ -0,0 +1,6 @@ +WorkflowID, JobID , SubmitTime , RunTime , NProcs , ReqNProcs , Dependencies +0 , 1 , 1 , 1 , 1 , 1, 5 4 3 +0 , 2 , 2 , 2 , 2 , 2, 3 +0 , 3 , 3 , 3 , 3 , 3, 5 +0 , 4 , 4 , 4 , 4 , 4, +0 , 5 , 5 , 5 , 5 , 5, diff --git a/database/schema.sql b/database/schema.sql index 89ba841e..c6f34f17 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -176,11 +176,11 @@ CREATE TABLE jobs ( -- A task that's defined in terms of how many flops (floating point operations) it takes to complete DROP TABLE IF EXISTS tasks; CREATE TABLE tasks ( - id INTEGER PRIMARY KEY NOT NULL AUTO_INCREMENT, - start_tick INTEGER NOT NULL CHECK (start_tick >= 0), - total_flop_count INTEGER NOT NULL, - job_id INTEGER NOT NULL, - parallelizability VARCHAR(50) NOT NULL, + id INTEGER PRIMARY KEY NOT NULL AUTO_INCREMENT, + start_tick INTEGER NOT NULL CHECK (start_tick >= 0), + total_flop_count BIGINT NOT NULL, + job_id INTEGER NOT NULL, + parallelizability VARCHAR(50) NOT NULL, FOREIGN KEY (job_id) REFERENCES jobs (id) ON DELETE CASCADE -- cgit v1.2.3 From 2212162c1311ce9b39cb2142f52bffddac3b9559 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Thu, 8 Feb 2018 15:33:20 +0100 Subject: Convert default GWF task concurrency to parallel --- database/gwf_converter/gwf_converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'database') diff --git a/database/gwf_converter/gwf_converter.py b/database/gwf_converter/gwf_converter.py index 0745fac3..81de2440 100644 --- a/database/gwf_converter/gwf_converter.py +++ b/database/gwf_converter/gwf_converter.py @@ -67,7 +67,7 @@ def write_to_db(trace_name, jobs): for task in job.tasks: task.db_id = execute_insert_query(conn, cursor, "INSERT INTO tasks (start_tick, total_flop_count, job_id, " - "parallelizability) VALUES (%d,%d,%d,'SEQUENTIAL')" + "parallelizability) VALUES (%d,%d,%d,'PARALLEL')" % (task.submit_time, task.flops, job.db_id)) for job in jobs: -- cgit v1.2.3 From b6875e16b1683f86226449d577a8c98b04d70915 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 15 Feb 2018 23:03:01 +0100 Subject: chore: Automatically import database schema in Docker image This change will make the database image automatically import the schema and test data. --- database/Dockerfile | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 database/Dockerfile (limited to 'database') diff --git a/database/Dockerfile b/database/Dockerfile new file mode 100644 index 00000000..0e933b40 --- /dev/null +++ b/database/Dockerfile @@ -0,0 +1,8 @@ +FROM mariadb:10.1 +MAINTAINER Fabian Mastenbroek + +# Import schema into database +ADD schema.sql /docker-entrypoint-initdb.d + +# Add test data into database +ADD test.sql /docker-entrypoint-initdb.d -- cgit v1.2.3