summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-m3sa/src/main/python
diff options
context:
space:
mode:
authorRadu Nicolae <rnicolae04@gmail.com>2024-10-25 08:21:49 +0200
committerGitHub <noreply@github.com>2024-10-25 08:21:49 +0200
commit27f5b7dcb05aefdab9b762175d538931face0aba (patch)
treeaed9b6cd324f73d4db9af5fc70000a62b4422fc1 /opendc-experiments/opendc-experiments-m3sa/src/main/python
parent4a010c6b9e033314a2624a0756dcdc7f17010d9d (diff)
M3SA - Multi-Meta-Model Simulation Analyzer (#251)
* (feat) demo files are now ignored * integrating m3sa changes with opendc * gitignore ignores demo * m3sa linked, tested, works 🎉🎆 * linting & checks fully pass * m3sa documentation (re...)added * package.json added, a potentail solution for Build Docker Images workflow * (fix) opendc-m3sa renamed to opendc-experiments-m3sa * (feat) Model is now a dataclass * (fix) package and package-lock reverted as before the PR, now they mirror the opendc master branch * (fix) Experiments renamed to experiment * branch updated with changes from master branch * trying to fix the build docker image failed workflow * trying to fix the build docker image failed workflow * All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability. (#255) (#37) Co-authored-by: Dante Niewenhuis <d.niewenhuis@hotmail.com> * All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability. (#255) (#38) Co-authored-by: Dante Niewenhuis <d.niewenhuis@hotmail.com> * All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability. (#255) (#39) Co-authored-by: Dante Niewenhuis <d.niewenhuis@hotmail.com> * [TEMP](feat) m3saCli decoupled from experimentCli * spotless and minor refactoring * (feat)[TEMP] decoupling m3sa from experiment * spotless applied * documentation resolved * requirements.txt added * path to M3SA is now provided as a parameter to M3SACLI * spotless applied * (fix) python environment variables solved, output analysis folder solved * documentation changed and matching the master branch doc * package-lock reverted * package-lock reverted --------- Co-authored-by: Dante Niewenhuis <d.niewenhuis@hotmail.com>
Diffstat (limited to 'opendc-experiments/opendc-experiments-m3sa/src/main/python')
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/python/accuracy_evaluator.py114
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/python/input_parser.py135
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/python/main.py20
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MetaModel.py214
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/python/models/Model.py70
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MultiModel.py501
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/python/requirements.txt4
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/python/simulator_specifics.py14
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/python/utils.py25
9 files changed, 1097 insertions, 0 deletions
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/accuracy_evaluator.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/accuracy_evaluator.py
new file mode 100644
index 00000000..463f69e6
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/accuracy_evaluator.py
@@ -0,0 +1,114 @@
+import numpy as np
+
+from models.MetaModel import MetaModel
+
+
+def accuracy_evaluator(
+ real_data,
+ multi_model,
+ compute_mape=True,
+ compute_nad=True,
+ compute_rmsle=True,
+ rmsle_hyperparameter=0.5,
+ only_metamodel=False
+):
+ """
+ :param real_data: the real-world data of the simulation
+ :param multi_model: the Multi-Model, containing individual models (possibly also a Meta-Model, with id=101)
+ :param MAPE: whether to calculate Mean Absolute Percentage Error (MAPE)
+ :param NAD: whether to calculate Normalized Absolute Differences (NAD)
+ :param RMSLE: whether to calculate Root Mean Square Logarithmic Error (RMSLE)
+ :param rmsle_hyperparameter: the hyperparameter that balances the ration underestimations:overestimations
+ - default is 0.5 (balanced penalty)
+ - < 0.5: more penalty for overestimations
+ - > 0.5: more penalty for underestimations
+ e.g., RMSLE_hyperparameter=0.3 -> 30% penalty for overestimations, 70% penalty for underestimations (3:7 ratio)
+ :return: None, but prints the accuracy metrics
+ """
+
+ meta_model = MetaModel(multimodel=multi_model)
+ multi_model.models.append(meta_model.meta_model) # metamodel
+ # multi_model.models.append(Model(raw_host_data=real_data, id=-1, path=None)) # real-world data
+
+ with open(multi_model.output_folder_path + "/accuracy_report.txt", "a") as f:
+ f.write("====================================\n")
+ f.write("Accuracy Report, against ground truth\n")
+
+ for model in multi_model.models:
+ if only_metamodel and model.id != 101:
+ continue
+
+ if model.id == -1:
+ f.write("Real-World data")
+ elif model.id == 101:
+ f.write(
+ f"Meta-Model, meta-function: {multi_model.user_input['meta_function']}, window_size: {meta_model.multi_model.window_size}")
+ else:
+ f.write(f"Model {model.id}")
+
+ simulation_data = model.raw_sim_data
+ min_len = min(len(real_data), len(simulation_data))
+ real_data = real_data[:min_len]
+ simulation_data = simulation_data[:min_len]
+ if compute_mape:
+ accuracy_mape = mape(
+ real_data=real_data,
+ simulation_data=simulation_data
+ )
+ f.write(f"\nMean Absolute Percentage Error (MAPE): {accuracy_mape}%")
+
+ if compute_nad:
+ accuracy_nad = nad(
+ real_data=real_data,
+ simulation_data=simulation_data
+ )
+ f.write(f"\nNormalized Absolute Differences (NAD): {accuracy_nad}%")
+
+ if compute_rmsle:
+ accuracy_rmsle = rmsle(
+ real_data=real_data,
+ simulation_data=simulation_data,
+ alpha=rmsle_hyperparameter
+ )
+ f.write(
+ f"\nRoot Mean Square Logarithmic Error (RMSLE), alpha={rmsle_hyperparameter}:{accuracy_rmsle}\n\n")
+
+ f.write("====================================\n")
+
+
+def mape(real_data, simulation_data):
+ """
+ Calculate Mean Absolute Percentage Error (MAPE)
+ :param real_data: Array of real values
+ :param simulation_data: Array of simulated values
+ :return: MAPE value
+ """
+ real_data = np.array(real_data)
+ simulation_data = np.array(simulation_data)
+ return round(np.mean(np.abs((real_data - simulation_data) / real_data)) * 100, 3)
+
+
+def nad(real_data, simulation_data):
+ """
+ Calculate Normalized Absolute Differences (NAD)
+ :param real_data: Array of real values
+ :param simulation_data: Array of simulated values
+ :return: NAD value
+ """
+ real_data = np.array(real_data)
+ simulation_data = np.array(simulation_data)
+ return round(np.sum(np.abs(real_data - simulation_data)) / np.sum(real_data) * 100, 3)
+
+
+def rmsle(real_data, simulation_data, alpha=0.5):
+ """
+ Calculate Root Mean Square Logarithmic Error (RMSLE) with an adjustable alpha parameter
+ :param real_data: Array of real values
+ :param simulation_data: Array of simulated values
+ :param alpha: Hyperparameter that balances the penalty between underestimations and overestimations
+ :return: RMSLE value
+ """
+ real_data = np.array(real_data)
+ simulation_data = np.array(simulation_data)
+ log_diff = alpha * np.log(real_data) - (1 - alpha) * np.log(simulation_data)
+ return round(np.sqrt(np.mean(log_diff ** 2)) * 100, 3)
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/input_parser.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/input_parser.py
new file mode 100644
index 00000000..cb1bc2b9
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/input_parser.py
@@ -0,0 +1,135 @@
+import json
+import os
+import sys
+import warnings
+
+
+def read_input(path=""):
+ """
+ Reads and processes the input JSON file from the specified path. Validates the input path,
+ ensures the file exists, and decodes the JSON content. Switches to the project root directory
+ before returning the parsed input.
+
+ :param path: The relative path to the input JSON file.
+ :type path: str
+ :raises ValueError: If the input path is not provided, file does not exist, or JSON decoding fails.
+ :return: Parsed JSON content.
+ :rtype: dict
+ :side effect: Changes the working directory to the project root.
+ """
+ if not path:
+ raise ValueError("No input path provided.")
+
+ path = path.strip().strip(',')
+
+ project_root = find_root_dir()
+ if not project_root:
+ raise ValueError("Project root not found.")
+
+ full_path = os.path.join(project_root, path)
+
+ if not os.path.exists(full_path):
+ raise ValueError(f"File does not exist: {full_path}")
+
+ try:
+ with open(full_path, 'r') as raw_json:
+ input_json = json.load(raw_json)
+ except json.JSONDecodeError:
+ raise ValueError("Failed to decode JSON.")
+ except IOError:
+ raise ValueError("MultiModel's parser says: Error opening file.")
+
+ switch_to_root_dir()
+
+ # Validate and apply defaults
+ input_json = parse_input(input_json)
+ return input_json
+
+
+def parse_input(input_json):
+ """
+ Validates and applies default values to the input JSON content. Ensures required fields are present
+ and raises warnings or errors for missing or invalid values.
+
+ :param input_json: The input JSON content.
+ :type input_json: dict
+ :raises ValueError: If required fields are missing or invalid values are provided.
+ :return: Validated and processed JSON content with defaults applied.
+ :rtype: dict
+ """
+
+ DEFAULTS = {
+ "multimodel": True,
+ "metamodel": False,
+ "window_size": 1,
+ "window_function": "mean",
+ "meta_function": "mean",
+ "samples_per_minute": 0,
+ "current_unit": "",
+ "unit_scaling_magnitude": 1,
+ "plot_type": "time_series",
+ "plot_title": "",
+ "x_label": "",
+ "y_label": "",
+ "seed": 0,
+ "y_ticks_count": None,
+ "x_ticks_count": None,
+ "y_min": None,
+ "y_max": None,
+ "x_min": None,
+ "x_max": None,
+ }
+
+ # Apply default values where not specified
+ for key, default_value in DEFAULTS.items():
+ if key not in input_json:
+ input_json[key] = default_value
+
+ # Special handling for required fields without default values
+ if "metric" not in input_json:
+ raise ValueError("Required field 'metric' is missing.")
+
+ if ("meta_function" not in input_json) and input_json["metamodel"]:
+ raise ValueError("Required field 'meta_function' is missing. Please select between 'mean' and 'median'. Alternatively,"
+ "disable metamodel in the config file.")
+
+ if input_json["meta_function"] not in ["mean", "median", "meta_equation1", "equation2", "equation3"]:
+ raise ValueError("Invalid value for meta_function. Please select between 'mean', 'median', !!!!!!!to be updated in the end!!!!!!!!.")
+
+ # raise a warning
+ if not input_json["multimodel"] and input_json["metamodel"]:
+ warnings.warn("Warning: Cannot have a Meta-Model without a Multi-Model. No computation made.")
+
+ return input_json
+
+
+def find_root_dir():
+ """
+ Searches for the project root directory by looking for a 'README.md' file in the current
+ and parent directories.
+
+ :return: The path to the project root directory if found, otherwise None.
+ :rtype: str or None
+ """
+ current_dir = os.path.dirname(os.path.abspath(__file__))
+ root = os.path.abspath(os.sep)
+ while current_dir and current_dir != root:
+ if os.path.exists(os.path.join(current_dir, 'README.md')):
+ return current_dir
+ current_dir = os.path.dirname(current_dir)
+ return None
+
+
+def switch_to_root_dir():
+ """
+ Switches the current working directory to the project root directory. Exits the program if the
+ root directory is not found.
+
+ :side effect: Changes the current working directory or exits the program.
+ """
+ root_dir = find_root_dir()
+ if root_dir:
+ os.chdir(root_dir)
+ else:
+ print("Failed to switch to root directory.")
+ sys.exit(1)
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/main.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/main.py
new file mode 100644
index 00000000..11ee836d
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/main.py
@@ -0,0 +1,20 @@
+from os import sys
+
+from input_parser import read_input
+from models.MetaModel import MetaModel
+from models.MultiModel import MultiModel
+
+
+def main():
+ multimodel = MultiModel(
+ user_input=read_input(sys.argv[2]),
+ path=sys.argv[1],
+ )
+
+ multimodel.generate_plot()
+
+ MetaModel(multimodel)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MetaModel.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MetaModel.py
new file mode 100644
index 00000000..49930d25
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MetaModel.py
@@ -0,0 +1,214 @@
+import numpy as np
+import os
+import pandas as pd
+
+from .Model import Model
+
+
+class MetaModel:
+ """
+ A class that aggregates results from multiple simulation models based on user-defined functions, producing
+ consolidated outputs for analysis.
+
+ Attributes:
+ multi_model (MultiModel): The container of models whose results are aggregated.
+ meta_model (Model): Model instance that stores aggregated results.
+ meta_function (function): Function used to calculate aggregated data.
+ min_raw_model_len (int): Minimum length of raw data arrays across all models.
+ min_processed_model_len (int): Minimum length of processed data arrays across all models.
+ number_of_models (int): Number of models being aggregated.
+ function_map (dict): Mapping of aggregation function names to function implementations.
+ """
+
+ META_MODEL_ID = -101
+
+ def __init__(self, multimodel, meta_function=None):
+ """
+ Initializes the Metamodel with a MultiModel instance and prepares aggregation functions based on configuration.
+
+ :param multimodel: MultiModel instance containing the models to aggregate.
+ :raise ValueError: If metamodel functionality is not enabled in the configuration.
+ """
+ if not multimodel.user_input.get('metamodel', False):
+ raise ValueError("Metamodel is not enabled in the config file")
+
+ self.function_map = {
+ 'mean': self.mean,
+ 'median': self.median,
+ 'meta_equation1': self.meta_equation1,
+ }
+
+ self.multi_model = multimodel
+ self.meta_model = Model(
+ raw_sim_data=[],
+ id=self.META_MODEL_ID,
+ path=self.multi_model.output_folder_path
+ )
+
+ if meta_function is not None:
+ self.meta_function = meta_function
+ else:
+ self.meta_function = self.function_map.get(multimodel.user_input['meta_function'], self.mean)
+
+ self.min_raw_model_len = min([len(model.raw_sim_data) for model in self.multi_model.models])
+ self.min_processed_model_len = min([len(model.processed_sim_data) for model in self.multi_model.models])
+ self.number_of_models = len(self.multi_model.models)
+ self.compute()
+ self.output()
+
+ def output(self):
+ """
+ Generates outputs by plotting the aggregated results and exporting the metamodel data to a file.
+ :return: None
+ :side effect: Outputs data to files and generates plots.
+ """
+ self.plot()
+ self.output_metamodel()
+
+ def compute(self):
+ """
+ Computes aggregated data based on the specified plot type from the configuration.
+ :raise ValueError: If an unsupported plot type is specified in the configuration.
+ """
+ if self.multi_model.plot_type == 'time_series':
+ self.compute_time_series()
+ elif self.multi_model.plot_type == 'cumulative':
+ self.compute_cumulative()
+ elif self.multi_model.plot_type == 'cumulative_time_series':
+ self.compute_cumulative_time_series()
+ else:
+ raise ValueError("Invalid plot type in config file")
+
+ def plot(self):
+ """
+ Plots the aggregated data according to the specified plot type from the configuration.
+ :raise ValueError: If an unsupported plot type is specified.
+ """
+ if self.multi_model.plot_type == 'time_series':
+ self.plot_time_series()
+ elif self.multi_model.plot_type == 'cumulative':
+ self.plot_cumulative()
+ elif self.multi_model.plot_type == 'cumulative_time_series':
+ self.plot_cumulative_time_series()
+
+ else:
+ raise ValueError("Invalid plot type in config file")
+
+ def compute_time_series(self):
+ """
+ Aggregates time series data across models using the specified aggregation function.
+ :return: None
+ :side effect: Updates the meta_model's processed data with aggregated results.
+ """
+ for i in range(0, self.min_processed_model_len):
+ data_entries = []
+ for j in range(self.number_of_models):
+ data_entries.append(self.multi_model.models[j].processed_sim_data[i])
+ self.meta_model.processed_sim_data.append(self.meta_function(data_entries))
+ self.meta_model.raw_sim_data = self.meta_model.processed_sim_data
+
+ def plot_time_series(self):
+ """
+ Generates a time series plot of the aggregated data.
+ :return: None
+ :side effect: Displays a time series plot using the multi_model's plotting capabilities.
+ """
+ self.multi_model.models.append(self.meta_model)
+ self.multi_model.generate_plot()
+
+ def compute_cumulative(self):
+ """
+ Aggregates cumulative data entries across all models.
+ :return: None
+ :side effect: Updates the meta_model's cumulative data with aggregated results.
+ """
+
+ for i in range(0, self.min_raw_model_len):
+ data_entries = []
+ for j in range(self.number_of_models):
+ sim_data = self.multi_model.models[j].raw_sim_data
+ ith_element = sim_data[i]
+ data_entries.append(ith_element)
+ self.meta_model.cumulated += self.mean(data_entries)
+ self.meta_model.cumulated = round(self.meta_model.cumulated, 2)
+
+ def plot_cumulative(self):
+ """
+ Generates a cumulative plot of the aggregated data.
+ :return: None
+ :side effect: Displays a cumulative plot using the multi_model's plotting capabilities.
+ """
+ self.multi_model.models.append(self.meta_model)
+ self.multi_model.generate_plot()
+
+ def compute_cumulative_time_series(self):
+ """
+ Aggregates cumulative time series data entries across models using the specified aggregation function.
+ :return: None
+ :side effect: Updates the meta_model's processed data with cumulative aggregated results.
+ """
+ for i in range(0, self.min_processed_model_len):
+ data_entries = []
+ for j in range(self.number_of_models):
+ data_entries.append(self.multi_model.models[j].processed_sim_data[i])
+ self.meta_model.processed_sim_data.append(self.meta_function(data_entries))
+
+ def plot_cumulative_time_series(self):
+ """
+ Generates a cumulative time series plot of the aggregated data.
+ :return: None
+ :side effect: Displays a cumulative time series plot using the multi_model's plotting capabilities.
+ """
+ self.multi_model.models.append(self.meta_model)
+ self.multi_model.generate_plot()
+
+ def output_metamodel(self):
+ """
+ Exports the processed sim data of the metamodel to a parquet file for further analysis or record keeping.
+ :return: None
+ :side effect: Writes data to a parquet file at the specified directory path.
+ """
+ directory_path = os.path.join(self.multi_model.output_folder_path, "raw-output/metamodel/seed=0")
+ os.makedirs(directory_path, exist_ok=True)
+ current_path = os.path.join(directory_path, f"{self.multi_model.metric}.parquet")
+ df = pd.DataFrame({'processed_sim_data': self.meta_model.processed_sim_data})
+ df.to_parquet(current_path, index=False)
+
+ def mean(self, chunks):
+ """
+ Calculates the mean of a list of numerical data.
+
+ :param chunks (list): The data over which to calculate the mean.
+ :return: float: The mean of the provided data.
+ """
+ return np.mean(chunks)
+
+ def median(self, chunks):
+ """
+ Calculates the median of a list of numerical data.
+
+ :param chunks (list): The data over which to calculate the median.
+ :return: float: The median of the provided data.
+ """
+ return np.median(chunks)
+
+ def meta_equation1(self, chunks):
+ """
+ Calculates a weighted mean where the weights are inversely proportional to the absolute difference from the median value.
+ :param chunks (list): Data chunks from which to calculate the weighted mean.
+ :return: float: The calculated weighted mean.
+ """
+
+ """Attempt 1"""
+ # median_val = np.median(chunks)
+ # proximity_weights = 1 / (1 + np.abs(chunks - median_val)) # Avoid division by zero
+ # weighted_mean = np.sum(proximity_weights * chunks) / np.sum(proximity_weights)
+ # return weighted_mean
+
+ """Attempt 2 Inter-Quartile Mean (same accuracy as mean)"""
+ # sorted_preds = np.sort(chunks, axis=0)
+ # Q1 = int(np.floor(0.25 * len(sorted_preds)))
+ # Q3 = int(np.floor(0.75 * len(sorted_preds)))
+ #
+ # iqm = np.mean(sorted_preds[Q1:Q3], axis=0)
+ # return iqm
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/Model.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/Model.py
new file mode 100644
index 00000000..f60f0bb0
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/Model.py
@@ -0,0 +1,70 @@
+"""
+A model is the output of simulator. It contains the data the simulator output, under a certain topology, seed,
+workload, datacenter configuration, etc. A model is further used in the analyzer as part of the MultiModel class,
+and further in the MetaModel class.
+
+:param sim: the simulation data of the model
+"""
+import json
+from dataclasses import dataclass, field
+
+@dataclass
+class Model:
+ """
+ Represents a single simulation output containing various data metrics collected under specific simulation conditions.
+ A Model object stores raw and processed simulation data and is designed to interact with higher-level structures like
+ MultiModel and MetaModel for complex data analysis.
+
+ Attributes:
+ raw_sim_data (list): Initial raw data from the simulator output.
+ processed_sim_data (list): Data derived from raw_sim_data after applying certain processing operations like aggregation or smoothing.
+ cumulative_time_series_values (list): Stores cumulative data values useful for time series analysis.
+ id (int): Unique identifier for the model, typically used for tracking and referencing within analysis tools.
+ path (str): Base path for storing or accessing related data files.
+ cumulated (float): Cumulative sum of processed data, useful for quick summaries and statistical analysis.
+ experiment_name (str): A descriptive name for the experiment associated with this model, potentially extracted from external metadata.
+ margins_of_error (list): Stores error margins associated with the data, useful for uncertainty analysis.
+ topologies (list): Describes the network or system topologies used during the simulation.
+ workloads (list): Lists the types of workloads applied during the simulation, affecting the simulation's applicability and scope.
+ allocation_policies (list): Details the resource allocation policies used, which influence the simulation outcomes.
+ carbon_trace_paths (list): Paths to data files containing carbon output or usage data, important for environmental impact studies.
+
+ Methods:
+ parse_trackr(): Reads additional configuration and metadata from a JSON file named 'trackr.json', enhancing the model with detailed context information.
+
+ Usage:
+ Model objects are typically instantiated with raw data from simulation outputs and an identifier. After instantiation,
+ the 'parse_trackr' method can be called to load additional experimental details from a corresponding JSON file.
+ """
+
+ path: str
+ raw_sim_data: list
+ id: int
+ processed_sim_data: list = field(default_factory=list)
+ cumulative_time_series_values: list = field(default_factory=list)
+ cumulated: float = 0.0
+ experiment_name: str = ""
+ margins_of_error: list = field(default_factory=list)
+ topologies: list = field(default_factory=list)
+ workloads: list = field(default_factory=list)
+ allocation_policies: list = field(default_factory=list)
+ carbon_trace_paths: list = field(default_factory=list)
+
+ def parse_trackr(self):
+ """
+ Parses the 'trackr.json' file located in the model's base path to extract and store detailed experimental metadata.
+ This method enhances the model with comprehensive contextual information about the simulation environment.
+
+ :return: None
+ :side effect: Updates model attributes with data from the 'trackr.json' file, such as experiment names, topologies, and policies.
+ :raises FileNotFoundError: If the 'trackr.json' file does not exist at the specified path.
+ :raises json.JSONDecodeError: If there is an error parsing the JSON data.
+ """
+ trackr_path = self.path + "/trackr.json"
+ with open(trackr_path) as f:
+ trackr = json.load(f)
+ self.experiment_name = trackr.get(self.id, {}).get('name', "")
+ self.topologies = trackr.get(self.id, {}).get('topologies', [])
+ self.workloads = trackr.get(self.id, {}).get('workloads', [])
+ self.allocation_policies = trackr.get(self.id, {}).get('allocationPolicies', [])
+ self.carbon_trace_paths = trackr.get(self.id, {}).get('carbonTracePaths', [])
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MultiModel.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MultiModel.py
new file mode 100644
index 00000000..17a92765
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MultiModel.py
@@ -0,0 +1,501 @@
+import matplotlib.pyplot as plt
+import numpy as np
+import os
+import pyarrow.parquet as pq
+import time
+from matplotlib.ticker import MaxNLocator, FuncFormatter
+
+from simulator_specifics import *
+from .MetaModel import MetaModel
+from .Model import Model
+
+
+def is_meta_model(model):
+ """
+ Check if the given model is a MetaModel based on its ID. A metamodel will always have an id of -101.
+
+ Args:
+ model (Model): The model to check.
+
+ Returns:
+ bool: True if model is MetaModel, False otherwise.
+ """
+ return model.id == MetaModel.META_MODEL_ID
+
+
+class MultiModel:
+ """
+ Handles multiple simulation models, aggregates their data based on user-defined parameters,
+ and generates plots and statistics.
+
+ Attributes:
+ user_input (dict): Configuration dictionary containing user settings for model processing.
+ path (str): The base directory path where output files and analysis results are stored.
+ window_size (int): The size of the window for data aggregation, which affects how data smoothing and granularity are handled.
+ models (list of Model): A list of Model instances that store the simulation data.
+ metric (str): The specific metric to be analyzed and plotted, as defined by the user.
+ measure_unit (str): The unit of measurement for the simulation data, adjusted according to the user's specifications.
+ output_folder_path (str): Path to the folder where output files are saved.
+ raw_output_path (str): Directory path where raw simulation data is stored.
+ analysis_file_path (str): Path to the file where detailed analysis results are recorded.
+ plot_type (str): The type of plot to generate, which can be 'time_series', 'cumulative', or 'cumulative_time_series'.
+ plot_title (str): The title of the plot.
+ x_label (str), y_label (str): Labels for the x and y axes of the plot.
+ x_min (float), x_max (float), y_min (float), y_max (float): Optional parameters to define axis limits for the plots.
+
+ Methods:
+ parse_user_input(window_size): Parses and sets the class attributes based on the provided user input.
+ adjust_unit(): Adjusts the unit of measurement based on user settings, applying appropriate metric prefixes.
+ set_paths(): Initializes the directory paths for storing outputs and analysis results.
+ init_models(): Reads simulation data from Parquet files and initializes Model instances.
+ compute_windowed_aggregation(): Processes the raw data by applying a windowed aggregation function for smoothing.
+ generate_plot(): Orchestrates the generation of the specified plot type by calling the respective plotting functions.
+ generate_time_series_plot(): Generates a time series plot of the aggregated data.
+ generate_cumulative_plot(): Creates a bar chart showing cumulative data for each model.
+ generate_cumulative_time_series_plot(): Produces a plot that displays cumulative data over time for each model.
+ save_plot(): Saves the generated plot to a PDF file in the specified directory.
+ output_stats(): Writes detailed statistics of the simulation to an analysis file for record-keeping.
+ mean_of_chunks(np_array, window_size): Calculates the mean of data segments for smoothing and processing.
+ get_cumulative_limits(model_sums): Determines appropriate x-axis limits for cumulative plots based on the model data.
+
+ Usage:
+ To use this class, instantiate it with a dictionary of user settings, a path for outputs, and optionally a window size.
+ Call the `generate_plot` method to process the data and generate plots as configured by the user.
+ """
+
+ def __init__(self, user_input, path, window_size=-1):
+ """
+ Initializes the MultiModel with provided user settings and prepares the environment.
+
+ :param user_input (dict): Configurations and settings from the user.
+ :param path (str): Path where output and analysis will be stored.
+ :param window_size (int): The size of the window to aggregate data; uses user input if -1.
+ :return: None
+ """
+
+ self.starting_time = time.time()
+ self.end_time = None
+ self.workload_time = None
+
+ self.user_input = user_input
+
+ self.metric = None
+ self.measure_unit = None
+ self.path = path
+ self.models = []
+
+ self.folder_path = None
+ self.output_folder_path = None
+ self.raw_output_path = None
+ self.analysis_file_path = None
+ self.unit_scaling = 1
+ self.window_size = -1
+ self.window_function = "median"
+ self.max_model_len = 0
+ self.seed = 0
+
+ self.plot_type = None
+ self.plot_title = None
+ self.x_label = None
+ self.y_label = None
+ self.x_min = None
+ self.x_max = None
+ self.y_min = None
+ self.y_max = None
+ self.plot_path = None
+
+ self.parse_user_input(window_size)
+ self.set_paths()
+ self.init_models()
+
+ self.compute_windowed_aggregation()
+
+ def parse_user_input(self, window_size):
+ """
+ Parses and sets attributes based on user input.
+
+ :param window_size (int): Specified window size for data aggregation, defaults to user_input if -1.
+ :return: None
+ """
+ if window_size == -1:
+ self.window_size = self.user_input["window_size"]
+ else:
+ self.window_size = window_size
+ self.metric = self.user_input["metric"]
+ self.measure_unit = self.adjust_unit()
+ self.window_function = self.user_input["window_function"]
+ self.seed = self.user_input["seed"]
+
+ self.plot_type = self.user_input["plot_type"]
+ self.plot_title = self.user_input["plot_title"]
+ if self.user_input["x_label"] == "":
+ self.x_label = "Samples"
+ else:
+ self.x_label = self.user_input["x_label"]
+
+ if self.user_input["y_label"] == "":
+ self.y_label = self.metric + " [" + self.measure_unit + "]"
+ else:
+ self.y_label = self.user_input["y_label"]
+
+ self.y_min = self.user_input["y_min"]
+ self.y_max = self.user_input["y_max"]
+ self.x_min = self.user_input["x_min"]
+ self.x_max = self.user_input["x_max"]
+
+ def adjust_unit(self):
+ """
+ Adjusts the unit of measurement according to the scaling magnitude specified by the user.
+ This method translates the given measurement scale into a scientifically accepted metric prefix.
+
+ :return str: The metric prefixed by the appropriate scale (e.g., 'kWh' for kilo-watt-hour if the scale is 3).
+ :raise ValueError: If the unit scaling magnitude provided by the user is not within the accepted range of scaling factors.
+ """
+ prefixes = ['n', 'μ', 'm', '', 'k', 'M', 'G', 'T']
+ scaling_factors = [-9, -6, -3, 1, 3, 6, 9]
+ given_metric = self.user_input["current_unit"]
+ self.unit_scaling = self.user_input["unit_scaling_magnitude"]
+
+ if self.unit_scaling not in scaling_factors:
+ raise ValueError(
+ "Unit scaling factor not found. Please enter a valid unit from [-9, -6, -3, 1, 3, 6, 9].")
+
+ if self.unit_scaling == 1:
+ return given_metric
+
+ for i in range(len(scaling_factors)):
+ if self.unit_scaling == scaling_factors[i]:
+ self.unit_scaling = 10 ** self.unit_scaling
+ result = prefixes[i] + given_metric
+ return result
+
+ def set_paths(self):
+ """
+ Configures and initializes the directory paths for output and analysis based on the base directory provided.
+ This method sets paths for the raw output and detailed analysis results, ensuring directories are created if
+ they do not already exist, and prepares a base file for capturing analytical summaries.
+
+ :return: None
+ :side effect: Creates necessary directories and files for output and analysis.
+ """
+ self.output_folder_path = os.getcwd() + "/" + self.path
+ self.raw_output_path = os.getcwd() + "/" + self.path + "/raw-output"
+ self.analysis_file_path = os.getcwd() + "/" + self.path + "/simulation-analysis/"
+ os.makedirs(self.analysis_file_path, exist_ok=True)
+ self.analysis_file_path = os.path.join(self.analysis_file_path, "analysis.txt")
+ if not os.path.exists(self.analysis_file_path):
+ with open(self.analysis_file_path, "w") as f:
+ f.write("Analysis file created.\n")
+
+ def init_models(self):
+ """
+ Initializes models from the simulation output stored in Parquet files. This method reads each Parquet file,
+ processes the relevant data, and initializes Model instances which are stored in the model list.
+
+ :return: None
+ :raise ValueError: If the unit scaling has not been set prior to model initialization.
+ """
+ model_id = 0
+
+ for simulation_folder in os.listdir(self.raw_output_path):
+ if simulation_folder == "metamodel":
+ continue
+ path_of_parquet_file = f"{self.raw_output_path}/{simulation_folder}/seed={self.seed}/{SIMULATION_DATA_FILE}.parquet"
+ parquet_file = pq.read_table(path_of_parquet_file).to_pandas()
+ raw = parquet_file.select_dtypes(include=[np.number]).groupby("timestamp")
+ raw = raw[self.metric].sum().values
+
+ if self.unit_scaling is None:
+ raise ValueError("Unit scaling factor is not set. Please ensure it is set correctly.")
+
+ raw = np.divide(raw, self.unit_scaling)
+
+ if self.user_input["samples_per_minute"] > 0:
+ MINUTES_IN_DAY = 1440
+ self.workload_time = len(raw) * self.user_input["samples_per_minute"] / MINUTES_IN_DAY
+
+ model = Model(raw_sim_data=raw, id=model_id, path=self.output_folder_path)
+ self.models.append(model)
+ model_id += 1
+
+ self.max_model_len = min([len(model.raw_sim_data) for model in self.models])
+
+ def compute_windowed_aggregation(self):
+ """
+ Applies a windowed aggregation function to each model's dataset. This method is typically used for smoothing
+ or reducing data granularity. It involves segmenting the dataset into windows of specified size and applying
+ an aggregation function to each segment.
+
+ :return: None
+ :side effect: Modifies each model's processed_sim_data attribute to contain aggregated data.
+ """
+ if self.plot_type != "cumulative":
+ for model in self.models:
+ numeric_values = model.raw_sim_data
+ model.processed_sim_data = self.mean_of_chunks(numeric_values, self.window_size)
+
+ def generate_plot(self):
+ """
+ Creates and saves plots based on the processed data from multiple models. This method determines
+ the type of plot to generate based on user input and invokes the appropriate plotting function.
+
+ The plotting options supported are 'time_series', 'cumulative', and 'cumulative_time_series'.
+ Depending on the type specified, this method delegates to specific plot-generating functions.
+
+ :return: None
+ :raises ValueError: If the plot type specified is not recognized or supported by the system.
+ :side effect:
+ - Generates and saves a plot to the file system.
+ - Updates the plot attributes based on the generated plot.
+ - Displays the plot on the matplotlib figure canvas.
+ """
+ plt.figure(figsize=(12, 10))
+ plt.xticks(size=22)
+ plt.yticks(size=22)
+ plt.ylabel(self.y_label, size=26)
+ plt.xlabel(self.x_label, size=26)
+ plt.title(self.plot_title, size=26)
+ plt.grid()
+
+ formatter = FuncFormatter(lambda x, _: '{:,}'.format(int(x)) if x >= 1000 else int(x))
+ ax = plt.gca()
+ ax.xaxis.set_major_formatter(formatter)
+ # ax.yaxis.set_major_formatter(formatter) yaxis has formatting issues - to solve in a future iteration
+
+ if self.user_input['x_ticks_count'] is not None:
+ ax = plt.gca()
+ ax.xaxis.set_major_locator(MaxNLocator(self.user_input['x_ticks_count']))
+
+ if self.user_input['y_ticks_count'] is not None:
+ ax = plt.gca()
+ ax.yaxis.set_major_locator(MaxNLocator(self.user_input['y_ticks_count']))
+
+ self.set_x_axis_lim()
+ self.set_y_axis_lim()
+
+ if self.plot_type == "time_series":
+ self.generate_time_series_plot()
+ elif self.plot_type == "cumulative":
+ self.generate_cumulative_plot()
+ elif self.plot_type == "cumulative_time_series":
+ self.generate_cumulative_time_series_plot()
+ else:
+ raise ValueError(
+ "Plot type not recognized. Please enter a valid plot type. The plot can be either "
+ "'time_series', 'cumulative', or 'cumulative_time_series'."
+ )
+
+ plt.tight_layout()
+ plt.subplots_adjust(right=0.85)
+ plt.legend(fontsize=12, bbox_to_anchor=(1, 1))
+ self.save_plot()
+ self.output_stats()
+
+ def generate_time_series_plot(self):
+ """
+ Plots time series data for each model. This function iterates over each model, applies the defined
+ windowing function to smooth the data, and plots the resulting series.
+
+ :return: None
+ :side effect: Plots are displayed on the matplotlib figure canvas.
+ """
+ for model in self.models:
+ label = "Meta-Model" if is_meta_model(model) else "Model " + str(model.id)
+ if is_meta_model(model):
+ repeated_means = np.repeat(means, self.window_size)[:len(model.processed_sim_data) * self.window_size]
+ plt.plot(
+ repeated_means,
+ drawstyle='steps-mid',
+ label=label,
+ color="red",
+ linestyle="--",
+ marker="o",
+ markevery=max(1, len(repeated_means) // 50),
+ linewidth=2
+ )
+ else:
+ means = self.mean_of_chunks(model.raw_sim_data, self.window_size)
+ repeated_means = np.repeat(means, self.window_size)[:len(model.raw_sim_data)]
+ plt.plot(repeated_means, drawstyle='steps-mid', label=label)
+
+ def generate_cumulative_plot(self):
+ """
+ Generates a horizontal bar chart showing cumulative data for each model. This function
+ aggregates total values per model and displays them in a bar chart, providing a visual
+ comparison of total values across models.
+
+ :return: None
+ :side effect: Plots are displayed on the matplotlib figure canvas.
+ """
+ plt.xlim(self.get_cumulative_limits(model_sums=self.sum_models_entries()))
+ plt.ylabel("Model ID", size=20)
+ plt.xlabel("Total " + self.metric + " [" + self.measure_unit + "]")
+ plt.yticks(range(len(self.models)), [model.id for model in self.models])
+ plt.grid(False)
+
+ cumulated_energies = self.sum_models_entries()
+ for i, model in enumerate(self.models):
+ label = "Meta-Model" if is_meta_model(model) else "Model " + str(model.id)
+ if is_meta_model(model):
+ plt.barh(label=label, y=i, width=cumulated_energies[i], color="red")
+ else:
+ plt.barh(label=label, y=i, width=cumulated_energies[i])
+ plt.text(cumulated_energies[i], i, str(cumulated_energies[i]), ha='left', va='center', size=26)
+
+ def generate_cumulative_time_series_plot(self):
+ """
+ Generates a plot showing the cumulative data over time for each model. This visual representation is
+ useful for analyzing trends and the accumulation of values over time.
+
+ :return: None
+ :side effect: Displays the cumulative data over time on the matplotlib figure canvas.
+ """
+ self.compute_cumulative_time_series()
+
+ for model in self.models:
+ if is_meta_model(model):
+ cumulative_repeated = np.repeat(model.cumulative_time_series_values, self.window_size)[
+ :len(model.processed_sim_data) * self.window_size]
+ plt.plot(
+ cumulative_repeated,
+ drawstyle='steps-mid',
+ label=("Meta-Model"),
+ color="red",
+ linestyle="--",
+ marker="o",
+ markevery=max(1, len(cumulative_repeated) // 10),
+ linewidth=3
+ )
+ else:
+ cumulative_repeated = np.repeat(model.cumulative_time_series_values, self.window_size)[
+ :len(model.raw_sim_data)]
+ plt.plot(cumulative_repeated, drawstyle='steps-mid', label=("Model " + str(model.id)))
+
+ def compute_cumulative_time_series(self):
+ """
+ Computes the cumulative sum of processed data over time for each model, storing the result for use in plotting.
+
+ :return: None
+ :side effect: Updates each model's 'cumulative_time_series_values' attribute with the cumulative sums.
+ """
+ for model in self.models:
+ cumulative_array = []
+ _sum = 0
+ for value in model.processed_sim_data:
+ _sum += value
+ cumulative_array.append(_sum * self.window_size)
+ model.cumulative_time_series_values = cumulative_array
+
+ def save_plot(self):
+ """
+ Saves the current plot to a PDF file in the specified directory, constructing the file path from the
+ plot attributes and ensuring that the directory exists before saving.
+
+ :return: None
+ :side effect: Creates or overwrites a PDF file containing the plot in the designated folder.
+ """
+ folder_prefix = self.output_folder_path + "/simulation-analysis/" + self.metric + "/"
+ self.plot_path = folder_prefix + self.plot_type + "_plot_multimodel_metric=" + self.metric + "_window=" + str(
+ self.window_size) + ".pdf"
+ plt.savefig(self.plot_path)
+
+ def set_x_axis_lim(self):
+ """
+ Sets the x-axis limits for the plot based on user-defined minimum and maximum values. If values
+ are not specified, the axis limits will default to encompassing all data points.
+
+ :return: None
+ :side effect: Adjusts the x-axis limits of the current matplotlib plot.
+ """
+ if self.x_min is not None:
+ plt.xlim(left=self.x_min)
+
+ if self.x_max is not None:
+ plt.xlim(right=self.x_max)
+
+ def set_y_axis_lim(self):
+ """
+ Dynamically sets the y-axis limits to be slightly larger than the range of the data, enhancing
+ the readability of the plot by ensuring all data points are comfortably within the view.
+
+ :return: None
+ :side effect: Adjusts the y-axis limits of the current matplotlib plot.
+ """
+ if self.y_min is not None:
+ plt.ylim(bottom=self.y_min)
+ if self.y_max is not None:
+ plt.ylim(top=self.y_max)
+
+ def sum_models_entries(self):
+ """
+ Computes the total values from each model for use in cumulative plotting. This method aggregates
+ the data across all models and prepares it for cumulative display.
+
+ :return: List of summed values for each model, useful for plotting and analysis.
+ """
+ models_sums = []
+ for (i, model) in enumerate(self.models):
+ if is_meta_model(model):
+ models_sums.append(model.cumulated)
+ else:
+ cumulated_energy = model.raw_sim_data.sum()
+ cumulated_energy = round(cumulated_energy, 2)
+ models_sums.append(cumulated_energy)
+
+ return models_sums
+
+ def output_stats(self):
+ """
+ Records and writes detailed simulation statistics to an analysis file. This includes time stamps,
+ performance metrics, and other relevant details.
+
+ :return: None
+ :side effect: Appends detailed simulation statistics to an existing file for record-keeping and analysis.
+ """
+ self.end_time = time.time()
+ with open(self.analysis_file_path, "a") as f:
+ f.write("\n\n========================================\n")
+ f.write("Simulation made at " + time.strftime("%Y-%m-%d %H:%M:%S") + "\n")
+ f.write("Metric: " + self.metric + "\n")
+ f.write("Unit: " + self.measure_unit + "\n")
+ f.write("Window size: " + str(self.window_size) + "\n")
+ f.write("Sample count in raw sim data: " + str(self.max_model_len) + "\n")
+ f.write("Computing time " + str(round(self.end_time - self.starting_time, 1)) + "s\n")
+ if (self.user_input["samples_per_minute"] > 0):
+ f.write("Workload time: " + str(round(self.workload_time, 2)) + " days\n")
+ f.write("Plot path" + self.plot_path + "\n")
+ f.write("========================================\n")
+
+ def mean_of_chunks(self, np_array, window_size):
+ """
+ Calculates the mean of data within each chunk for a given array. This method helps in smoothing the data by
+ averaging over specified 'window_size' segments.
+
+ :param np_array (np.array): Array of numerical data to be chunked and averaged.
+ :param window_size (int): The size of each segment to average over.
+ :return: np.array: An array of mean values for each chunk.
+ :side effect: None
+ """
+ if window_size == 1:
+ return np_array
+
+ chunks = [np_array[i:i + window_size] for i in range(0, len(np_array), window_size)]
+ means = [np.mean(chunk) for chunk in chunks]
+ return np.array(means)
+
+ def get_cumulative_limits(self, model_sums):
+ """
+ Calculates the appropriate x-axis limits for cumulative plots based on the summarized data from each model.
+
+ :param model_sums (list of float): The total values for each model.
+ :return: tuple: A tuple containing the minimum and maximum x-axis limits.
+ """
+ axis_min = min(model_sums) * 0.9
+ axis_max = max(model_sums) * 1.1
+
+ if self.user_input["x_min"] is not None:
+ axis_min = self.user_input["x_min"]
+ if self.user_input["x_max"] is not None:
+ axis_max = self.user_input["x_max"]
+
+ return [axis_min * 0.9, axis_max * 1.1]
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/requirements.txt b/opendc-experiments/opendc-experiments-m3sa/src/main/python/requirements.txt
new file mode 100644
index 00000000..cbd22985
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/requirements.txt
@@ -0,0 +1,4 @@
+matplotlib==3.8.4
+numpy==2.1.1
+pandas==2.2.2
+pyarrow==16.1.0
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/simulator_specifics.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/simulator_specifics.py
new file mode 100644
index 00000000..4e1c36e1
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/simulator_specifics.py
@@ -0,0 +1,14 @@
+"""
+This file is the integration layer of the M3SA tool upon any (ICT) simulator.
+
+The system will use the elements from this file in the analysis / meta-simulation process.
+"""
+
+"""
+SIMULATION_DATA_FILE (str): The name of the file containing the simulation data. Enter only the name, not the path, not
+the extension. The data file must be parquet format.
+
+✅ Good: "host", "simulation_data", "cats_predictions"
+❌ Wrong: "host.json", "opendc/folder_x/folder_y/data"
+"""
+SIMULATION_DATA_FILE = "host" # opendc outputs in file host.parquet
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/utils.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/utils.py
new file mode 100644
index 00000000..fd4fec2e
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/utils.py
@@ -0,0 +1,25 @@
+import sys
+
+"""
+Constants for the main.py file
+"""
+
+SIMULATION_ANALYSIS_FOLDER_NAME = 'simulation-analysis'
+EMISSIONS_ANALYSIS_FOLDER_NAME = 'carbon_emission'
+ENERGY_ANALYSIS_FOLDER_NAME = 'power_draw'
+
+"""
+Utility functions
+"""
+
+
+def clean_analysis_file(metric):
+ analysis_file_path = SIMULATION_ANALYSIS_FOLDER_NAME + "/"
+ if metric == "power_draw":
+ analysis_file_path += ENERGY_ANALYSIS_FOLDER_NAME
+ else:
+ analysis_file_path += EMISSIONS_ANALYSIS_FOLDER_NAME
+ analysis_file_path += "/analysis.txt"
+
+ with open(analysis_file_path, "w") as f:
+ f.write("")