summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/multi_model.py
diff options
context:
space:
mode:
authorRadu Nicolae <rnicolae04@gmail.com>2025-06-16 18:01:07 +0200
committerGitHub <noreply@github.com>2025-06-16 18:01:07 +0200
commit0df3d9ced743ac3385dd710c7133a6cf369b051c (patch)
treeeff5d6d67c275643e229731ba08c5fe7dc4ccd0a /opendc-experiments/opendc-experiments-m3sa/src/main/python/models/multi_model.py
parentc7e303ad1b5217e2ff24cee9538ac841d6149706 (diff)
integrated M3SA, updated with tests and CpuPowerModels
Diffstat (limited to 'opendc-experiments/opendc-experiments-m3sa/src/main/python/models/multi_model.py')
-rw-r--r--opendc-experiments/opendc-experiments-m3sa/src/main/python/models/multi_model.py410
1 files changed, 410 insertions, 0 deletions
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/multi_model.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/multi_model.py
new file mode 100644
index 00000000..4f993fee
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/multi_model.py
@@ -0,0 +1,410 @@
+import matplotlib.pyplot as plt
+import numpy as np
+import os
+import pyarrow.parquet as pq
+from time import time, strftime
+from matplotlib.ticker import MaxNLocator, FuncFormatter
+from matplotlib.ticker import AutoMinorLocator
+from typing import IO
+from textwrap import dedent
+from models import Model
+from util import SimulationConfig, adjust_unit, PlotType, SIMULATION_DATA_FILE
+
+
+class MultiModel:
+ """
+ Handles multiple simulation models, aggregates their data based on user-defined parameters,
+ and generates plots and statistics.
+
+ Attributes:
+ window_size (int): The size of the window for data aggregation, which affects how data smoothing and granularity are handled.
+ models (list of Model): A list of Model instances that store the simulation data.
+ measure_unit (str): The unit of measurement for the simulation data, adjusted according to the user's specifications.
+ unit_scaling (int): The scaling factor applied to the unit of measurement.
+ max_model_len (int): The length of the shortest model's raw data, used for consistency in processing.
+ plot_path (str): The path where the generated plot will be saved.
+ analysis_file (IO): The file object for writing detailed analysis statistics.
+ COLOR_PALETTE (list of str): A list of color codes for plotting multiple models.
+
+ Methods:
+ parse_user_input(window_size): Parses and sets the class attributes based on the provided user input.
+ adjust_unit(): Adjusts the unit of measurement based on user settings, applying appropriate metric prefixes.
+ set_paths(): Initializes the directory paths for storing outputs and analysis results.
+ init_models(): Reads simulation data from Parquet files and initializes Model instances.
+ compute_windowed_aggregation(): Processes the raw data by applying a windowed aggregation function for smoothing.
+ generate_plot(): Orchestrates the generation of the specified plot type by calling the respective plotting functions.
+ generate_time_series_plot(): Generates a time series plot of the aggregated data.
+ generate_cumulative_plot(): Creates a bar chart showing cumulative data for each model.
+ generate_cumulative_time_series_plot(): Produces a plot that displays cumulative data over time for each model.
+ save_plot(): Saves the generated plot to a PDF file in the specified directory.
+ output_stats(): Writes detailed statistics of the simulation to an analysis file for record-keeping.
+ mean_of_chunks(np_array, window_size): Calculates the mean of data segments for smoothing and processing.
+ get_cumulative_limits(model_sums): Determines appropriate x-axis limits for cumulative plots based on the model data.
+
+ Usage:
+ To use this class, instantiate it with a dictionary of user settings, a path for outputs, and optionally a window size.
+ Call the `generate_plot` method to process the data and generate plots as configured by the user.
+ """
+
+ COLOR_PALETTE: list[str] = [
+ # Colorblind-friendly palette
+ "#0072B2", "#E69F00", "#009E73", "#D55E00", "#CC79A7", "#F0E442", "#8B4513",
+ "#56B4E9", "#F0A3FF", "#FFB400", "#00BFFF", "#90EE90", "#FF6347", "#8A2BE2", "#CD5C5C",
+ "#4682B4", "#FFDEAD", "#32CD32", "#D3D3D3", "#999999"
+ ]
+
+ def __init__(self, config: SimulationConfig, window_size: int = -1):
+ """
+ Initializes the MultiModel with provided user settings and prepares the environment.
+
+ :param user_input (dict): Configurations and settings from the user.
+ :param path (str): Path where output and analysis will be stored.
+ :param window_size (int): The size of the window to aggregate data; uses user input if -1.
+ :return: None
+ """
+
+ self.config: SimulationConfig = config
+ self.starting_time: float = time()
+ self.workload_time = None
+ self.timestamps = None
+ self.plot_path: str | None = None
+
+ self.window_size = config.window_size if window_size == -1 else window_size
+ self.measure_unit: str
+ self.unit_scaling: int
+ self.measure_unit, self.unit_scaling = adjust_unit(config.current_unit, config.unit_scaling_magnitude)
+
+ self.models: list[Model] = []
+ self.max_model_len = 0
+
+ try:
+ os.makedirs(self.config.output_path, exist_ok=True)
+ self.analysis_file: IO = open(config.output_path + "/analysis.txt", "w")
+ except Exception as e:
+ print(f"Error handling output directory: {e}")
+ exit(1)
+
+ self.analysis_file.write("Analysis file create\n")
+
+ self.init_models()
+ if self.config.is_metamodel:
+ self.COLOR_PALETTE = ["#b3b3b3" for _ in range(len(self.models))]
+ if len(self.config.plot_colors) > 0:
+ self.COLOR_PALETTE = self.config.plot_colors
+ self.compute_windowed_aggregation()
+
+ def get_model_path(self, dir: str) -> str:
+ return (
+ f"{self.config.simulation_path}/"
+ f"{dir}/"
+ f"seed={self.config.seed}/"
+ f"{SIMULATION_DATA_FILE}.parquet"
+ )
+
+ def init_models(self):
+ """
+ Initializes models from the simulation output stored in Parquet files. This method reads each Parquet file,
+ processes the relevant data, and initializes Model instances which are stored in the model list.
+
+ :return: None
+ :raise ValueError: If the unit scaling has not been set prior to model initialization.
+ """
+ if self.unit_scaling is None:
+ raise ValueError("Unit scaling factor is not set. Please ensure it is set correctly.")
+
+ simulation_directories = os.listdir(self.config.simulation_path)
+ simulation_directories.sort()
+
+ for sim_dir in simulation_directories:
+ print("Processing simulation: ", sim_dir)
+ if sim_dir == "metamodel":
+ continue
+
+ simulation_id: str = os.path.basename(sim_dir)
+ columns_to_read = ['timestamp', self.config.metric]
+ parquet_file = pq.read_table(self.get_model_path(sim_dir), columns=columns_to_read).to_pandas()
+
+ grouped_data = parquet_file.groupby('timestamp')[self.config.metric].sum()
+ # Apply unit scaling to the raw data
+ raw = np.divide(grouped_data.values, self.unit_scaling)
+ timestamps = parquet_file['timestamp'].unique()
+
+ model = Model(raw_sim_data=raw, identifier=simulation_id)
+ self.models.append(model)
+
+ if self.timestamps is None or len(self.timestamps) > len(timestamps):
+ self.timestamps = timestamps
+
+ self.max_model_len = min([len(model.raw_sim_data) for model in self.models])
+
+ def compute_windowed_aggregation(self) -> None:
+ """
+ Applies a windowed aggregation function to each model's dataset. This method is typically used for smoothing
+ or reducing data granularity. It involves segmenting the dataset into windows of specified size and applying
+ an aggregation function to each segment.
+
+ :return: None
+ :side effect: Modifies each model's processed_sim_data attribute to contain aggregated data.
+ """
+ if self.config.plot_type == PlotType.CUMULATIVE:
+ return
+
+ for model in self.models:
+ numeric_values = model.raw_sim_data
+ model.processed_sim_data = self.mean_of_chunks(numeric_values, self.config.window_size)
+
+ def generate_plot(self):
+ """
+ Creates and saves plots based on the processed data from multiple models. This method determines
+ the type of plot to generate based on user input and invokes the appropriate plotting function.
+
+ The plotting options supported are 'time_series', 'cumulative', and 'cumulative_time_series'.
+ Depending on the type specified, this method delegates to specific plot-generating functions.
+
+ :return: None
+ :raises ValueError: If the plot type specified is not recognized or supported by the system.
+ :side effect:
+ - Generates and saves a plot to the file system.
+ - Updates the plot attributes based on the generated plot.
+ - Displays the plot on the matplotlib figure canvas.
+ """
+ plt.figure(figsize=self.config.fig_size)
+
+ plt.xticks(size=32)
+ plt.yticks(size=32)
+ plt.ylabel(self.config.y_axis.label, size=26)
+ plt.xlabel(self.config.x_axis.label, size=26)
+ plt.title(self.config.plot_title, size=26)
+ plt.grid()
+
+ formatter = FuncFormatter(lambda x, _: '{:,}'.format(int(x)) if x >= 1000 else int(x))
+ ax = plt.gca()
+ ax.xaxis.set_major_formatter(formatter)
+
+ if self.config.x_axis.has_ticks():
+ ax = plt.gca()
+ ax.xaxis.set_major_locator(MaxNLocator(self.config.x_axis.ticks))
+
+ if self.config.y_axis.has_ticks():
+ ax = plt.gca()
+ ax.yaxis.set_major_locator(MaxNLocator(self.config.y_axis.ticks))
+
+ self.set_axis_limits()
+
+ match self.config.plot_type:
+ case PlotType.TIME_SERIES:
+ self.generate_time_series_plot()
+ case PlotType.CUMULATIVE:
+ self.generate_cumulative_plot()
+ case PlotType.CUMULATIVE_TIME_SERIES:
+ self.generate_cumulative_time_series_plot()
+
+ plt.tight_layout()
+ plt.subplots_adjust(right=0.85)
+ self.save_plot()
+ self.output_stats()
+
+ def generate_time_series_plot(self):
+ """
+ Plots time series data for each model. This function iterates over each model, applies the defined
+ windowing function to smooth the data, and plots the resulting series.
+
+ :return: None
+ :side effect: Plots are displayed on the matplotlib figure canvas.
+ """
+
+ for i, model in enumerate(self.models):
+ label = "Meta-Model" if model.is_meta_model() else "Model " + str(model.id)
+
+ if model.is_meta_model():
+ repeated_means = np.repeat(model.processed_sim_data, self.window_size)
+ plt.plot(repeated_means, drawstyle='steps-mid', label=label, color="#228B22", linestyle="solid",
+ linewidth=2)
+ else:
+ means = self.mean_of_chunks(model.raw_sim_data, self.window_size)
+ repeated_means = np.repeat(means, self.window_size)[:len(model.raw_sim_data)]
+ plt.plot(repeated_means, drawstyle='steps-mid', label=label, color=self.COLOR_PALETTE[i])
+
+ def generate_cumulative_plot(self):
+ """
+ Generates a horizontal bar chart showing cumulative data for each model. This function
+ aggregates total values per model and displays them in a bar chart, providing a visual
+ comparison of total values across models.
+
+ :return: None
+ :side effect: Plots are displayed on the matplotlib figure canvas.
+ """
+ plt.xlim(self.get_cumulative_limits(model_sums=self.sum_models_entries()))
+ plt.ylabel("Model ID", size=30)
+ plt.xlabel(self.config.x_axis.label, size=30)
+
+ ax = plt.gca()
+ ax.tick_params(axis='x', which='major', length=12) # Set length of the ticks
+ ax.set_xticklabels([]) # Hide x-axis numbers
+ ax.xaxis.set_minor_locator(AutoMinorLocator(5)) # Set two minor ticks between majors
+ ax.tick_params(axis='x', which='minor', length=7, color='black')
+ plt.yticks(range(len(self.models)), [model.id for model in self.models])
+
+ plt.grid(False)
+
+ cumulated_energies = self.sum_models_entries()
+
+ for i, model in (enumerate(self.models)):
+ label = "Meta-Model" if model.is_meta_model() else "Model " + str(model.id)
+ if model.is_meta_model():
+ plt.barh(i, cumulated_energies[i], label=label, color='#009E73', hatch='//')
+ plt.text(cumulated_energies[i], i, str(int(round(cumulated_energies[i], 0))), ha='left', va='center',
+ size=26)
+ else:
+ round_decimals = 0 if cumulated_energies[i] > 500 else 1
+ plt.barh(label=label, y=i, width=cumulated_energies[i], color=self.COLOR_PALETTE[i])
+ plt.text(cumulated_energies[i], i, str(int(round(cumulated_energies[i], round_decimals))), ha='left',
+ va='center', size=26)
+
+ def generate_cumulative_time_series_plot(self):
+ """
+ Generates a plot showing the cumulative data over time for each model. This visual representation is
+ useful for analyzing trends and the accumulation of values over time.
+
+ :return: None
+ :side effect: Displays the cumulative data over time on the matplotlib figure canvas.
+ """
+ self.compute_cumulative_time_series()
+
+ for i, model in enumerate(self.models):
+ label = "Meta-Model" if model.is_meta_model() else "Model " + str(model.id)
+ if model.is_meta_model():
+ cumulative_repeated = np.repeat(model.cumulative_time_series_values, self.window_size)[
+ :len(model.processed_sim_data) * self.window_size]
+ plt.plot(cumulative_repeated, label=label, drawstyle='steps-mid', color="#228B22", linestyle="solid",
+ linewidth=2)
+ else:
+ cumulative_repeated = np.repeat(model.cumulative_time_series_values, self.window_size)[
+ :len(model.raw_sim_data)]
+ plt.plot(cumulative_repeated, drawstyle='steps-mid', label=("Model " + str(model.id)),
+ color=self.COLOR_PALETTE[i])
+
+ def compute_cumulative_time_series(self):
+ """
+ Computes the cumulative sum of processed data over time for each model, storing the result for use in plotting.
+
+ :return: None
+ :side effect: Updates each model's 'cumulative_time_series_values' attribute with the cumulative sums.
+ """
+ for model in self.models:
+ cumulative_array = []
+ _sum = 0
+ for value in model.processed_sim_data:
+ _sum += value
+ cumulative_array.append(_sum * self.window_size)
+ model.cumulative_time_series_values = cumulative_array
+
+ def save_plot(self):
+ """
+ Saves the current plot to a PDF file in the specified directory, constructing the file path from the
+ plot attributes and ensuring that the directory exists before saving.
+
+ :return: None
+ :side effect: Creates or overwrites a PDF file containing the plot in the designated folder.
+ """
+ output_dir = f"{self.config.output_path}/simulation-analysis/{self.config.metric}"
+ try:
+ os.makedirs(output_dir, exist_ok=True)
+ except OSError as e:
+ print(f"Error handling output directory: {e}")
+ exit(1)
+
+ self.plot_path: str = (
+ f"{output_dir}/"
+ f"{self.config.plot_type}"
+ f"_plot_multimodel_metric={self.config.metric}"
+ f"_window={self.window_size}"
+ f".pdf"
+ ) if self.config.figure_export_name is None \
+ else f"{output_dir}/{self.config.figure_export_name}.pdf"
+
+ plt.savefig(self.plot_path)
+
+ def set_axis_limits(self) -> None:
+ """
+ Sets the x-axis and y-axis limits for the current plot based on the user-defined configuration.
+ This method ensures that the plot displays the data within the specified range, enhancing readability.
+ """
+ if self.config.x_axis.has_range():
+ plt.xlim(left=self.config.x_axis.value_range[0], right=self.config.x_axis.value_range[1])
+
+ if self.config.y_axis.has_range():
+ plt.ylim(bottom=self.config.y_axis.value_range[0], top=self.config.y_axis.value_range[1])
+
+ def sum_models_entries(self):
+ """
+ Computes the total values from each model for use in cumulative plotting. This method aggregates
+ the data across all models and prepares it for cumulative display.
+
+ :return: List of summed values for each model, useful for plotting and analysis.
+ """
+ models_sums = []
+ for i, model in enumerate(self.models):
+ if model.is_meta_model():
+ models_sums.append(model.cumulated)
+ else:
+ cumulated_energy = model.raw_sim_data.sum()
+ cumulated_energy = round(cumulated_energy, 2)
+ models_sums.append(cumulated_energy)
+
+ return models_sums
+
+ def output_stats(self) -> None:
+ """
+ Records and writes detailed simulation statistics to an analysis file. This includes time stamps,
+ performance metrics, and other relevant details.
+
+ :return: None
+ :side effect: Appends detailed simulation statistics to an existing file for record-keeping and analysis.
+ """
+ end_time: float = time()
+ self.analysis_file.write(dedent(
+ f"""
+ =========================================================
+ Simulation made at {strftime("%Y-%m-%d %H:%M:%S")}
+ Metric: {self.config.metric}
+ Unit: {self.measure_unit}
+ Window size: {self.window_size}
+ Sample count in raw sim data: {self.max_model_len}
+ Computing time {round(end_time - self.starting_time, 1)}s
+ Plot path: {self.plot_path}
+ =========================================================
+ """
+ ))
+
+ def mean_of_chunks(self, np_array: np.array, window_size: int) -> np.array:
+ """
+ Calculates the mean of data within each chunk for a given array. This method helps in smoothing the data by
+ averaging over specified 'window_size' segments.
+
+ :param np_array: Array of numerical data to be chunked and averaged.
+ :param window_size: The size of each segment to average over.
+ :return: np.array: An array of mean values for each chunk.
+ """
+ if window_size == 1:
+ return np_array
+
+ chunks: list[np.array] = [np_array[i:i + window_size] for i in range(0, len(np_array), window_size)]
+ means: list[float] = [np.mean(chunk) for chunk in chunks]
+ return np.array(means)
+
+ def get_cumulative_limits(self, model_sums: list[float]) -> list[float]:
+ """
+ Calculates the appropriate x-axis limits for cumulative plots based on the summarized data from each model.
+
+ :param model_sums: List of summed values for each model.
+ :return: list[float]: A list containing the minimum and maximum values for the x-axis limits.
+ """
+ axis_min = min(model_sums) * 0.9
+ axis_max = max(model_sums) * 1.1
+
+ if self.config.x_axis.value_range is not None:
+ axis_min = self.config.x_axis.value_range[0]
+ axis_max = self.config.x_axis.value_range[1]
+
+ return [axis_min * 0.9, axis_max * 1.1]