diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-m3sa/src/main/python/util')
4 files changed, 378 insertions, 0 deletions
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/__init__.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/__init__.py new file mode 100644 index 00000000..120c2f56 --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/__init__.py @@ -0,0 +1,2 @@ +from .config import parse_configuration, SimulationConfig, PlotAxis, PlotType +from .util import * diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/accuracy_evaluator.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/accuracy_evaluator.py new file mode 100644 index 00000000..0fae4898 --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/accuracy_evaluator.py @@ -0,0 +1,114 @@ +import numpy as np + +from models.meta_model import MetaModel + + +def accuracy_evaluator( + real_data, + multi_model, + compute_mape=True, + compute_nad=True, + compute_rmsle=True, + rmsle_hyperparameter=0.5, + only_metamodel=False +): + """ + :param real_data: the real-world data of the simulation + :param multi_model: the Multi-Model, containing individual models (possibly also a Meta-Model, with id=101) + :param MAPE: whether to calculate Mean Absolute Percentage Error (MAPE) + :param NAD: whether to calculate Normalized Absolute Differences (NAD) + :param RMSLE: whether to calculate Root Mean Square Logarithmic Error (RMSLE) + :param rmsle_hyperparameter: the hyperparameter that balances the ration underestimations:overestimations + - default is 0.5 (balanced penalty) + - < 0.5: more penalty for overestimations + - > 0.5: more penalty for underestimations + e.g., RMSLE_hyperparameter=0.3 -> 30% penalty for overestimations, 70% penalty for underestimations (3:7 ratio) + :return: None, but prints the accuracy metrics + """ + + meta_model = MetaModel(multi_model=multi_model) + multi_model.models.append(meta_model.meta_model) # metamodel + # multi_model.models.append(Model(raw_host_data=real_data, id=-1, path=None)) # real-world data + + with open(multi_model.output_folder_path + "/accuracy_report.txt", "a") as f: + f.write("====================================\n") + f.write("Accuracy Report, against ground truth\n") + + for model in multi_model.models: + if only_metamodel and model.id != -101: + continue + + if model.id == -1: + f.write("Real-World data") + elif model.id == -101: + f.write( + f"Meta-Model, meta-function: {multi_model.user_input['meta_function']}, window_size: {meta_model.multi_model.window_size}") + else: + f.write(f"Model {model.id}") + + simulation_data = model.raw_sim_data + min_len = min(len(real_data), len(simulation_data)) + real_data = real_data[:min_len] + simulation_data = simulation_data[:min_len] + if compute_mape: + accuracy_mape = mape( + real_data=real_data, + simulation_data=simulation_data + ) + f.write(f"| Mean Absolute Percentage Error (MAPE): {accuracy_mape}%\n") + + if compute_nad: + accuracy_nad = nad( + real_data=real_data, + simulation_data=simulation_data + ) + f.write(f"\nNormalized Absolute Differences (NAD): {accuracy_nad}%") + + if compute_rmsle: + accuracy_rmsle = rmsle( + real_data=real_data, + simulation_data=simulation_data, + alpha=rmsle_hyperparameter + ) + f.write( + f"\nRoot Mean Square Logarithmic Error (RMSLE), alpha={rmsle_hyperparameter}:{accuracy_rmsle}\n\n") + + f.write("====================================\n") + + +def mape(real_data, simulation_data): + """ + Calculate Mean Absolute Percentage Error (MAPE) + :param real_data: Array of real values + :param simulation_data: Array of simulated values + :return: MAPE value + """ + real_data = np.array(real_data) + simulation_data = np.array(simulation_data) + return round(np.mean(np.abs((real_data - simulation_data) / real_data)) * 100, 3) + + +def nad(real_data, simulation_data): + """ + Calculate Normalized Absolute Differences (NAD) + :param real_data: Array of real values + :param simulation_data: Array of simulated values + :return: NAD value + """ + real_data = np.array(real_data) + simulation_data = np.array(simulation_data) + return round(np.sum(np.abs(real_data - simulation_data)) / np.sum(real_data) * 100, 3) + + +def rmsle(real_data, simulation_data, alpha=0.5): + """ + Calculate Root Mean Square Logarithmic Error (RMSLE) with an adjustable alpha parameter + :param real_data: Array of real values + :param simulation_data: Array of simulated values + :param alpha: Hyperparameter that balances the penalty between underestimations and overestimations + :return: RMSLE value + """ + real_data = np.array(real_data) + simulation_data = np.array(simulation_data) + log_diff = alpha * np.log(real_data) - (1 - alpha) * np.log(simulation_data) + return round(np.sqrt(np.mean(log_diff ** 2)) * 100, 3) diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/config.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/config.py new file mode 100644 index 00000000..e0d9827b --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/config.py @@ -0,0 +1,186 @@ +from json import JSONDecodeError, load +from warnings import warn +from numpy import mean, median +from typing import Callable +from enum import Enum +from sys import stderr +import os + +FUNCTIONS = { + "mean": mean, + "median": median, +} + + +class PlotType(Enum): + TIME_SERIES = "time_series" + CUMULATIVE = "cumulative" + CUMULATIVE_TIME_SERIES = "cumulative_time_series" + + def __str__(self) -> str: + return self.value + + +def get_plot_type(plot_type: str) -> PlotType: + """ + Returns the PlotType enum value for the given string + Args: + plot_type: the string representation of the plot type + Returns: + the PlotType enum value + """ + return next((pt for pt in PlotType if pt.value == plot_type), PlotType.TIME_SERIES) + + +class PlotAxis: + """ + This class represents an axis of a plot. It contains the label, value range, and number of ticks for the axis. + Attributes: + label (str): the label of the axis + value_range (tuple[float, float]): the range of values for the axis + ticks (int): the number of ticks on the axis + """ + + def __init__(self, label: str, value_range: tuple[float, float] | None, ticks: int | None): + self.label = label + self.value_range = value_range + self.ticks = ticks + + def has_range(self) -> bool: + """ + Checks if the axis has a value range + Returns: + True if the axis has a value range, False otherwise + """ + return self.value_range is not None + + def has_ticks(self) -> bool: + """ + Checks if the axis has a number of ticks + Returns: + True if the axis has a number of ticks, False otherwise + """ + return self.ticks is not None + + +class SimulationConfig: + """ + This class represents the configuration of a simulation. + It contains all the necessary parameters to run a simulation using multiple models. + + Attributes: + is_multimodel (bool): whether the simulation is multimodel + is_metamodel (bool): whether the simulation is a metamodel + metric (str): the metric to be used + window_function (function): the window function to be used + meta_function (function): the meta function to be used + window_size (int): the window size + samples_per_minute (int): the number of samples per minute + current_unit (str): the current unit + unit_scaling_magnitude (int): the unit scaling magnitude + plot_type (str): the plot type + plot_title (str): the plot title + x_axis (PlotAxis): the x-axis + y_axis (PlotAxis): the y-axis + seed (int): the seed + fig_size (tuple[int, int]): the figure size + """ + + def __init__(self, input_json: dict[str, any], output_path: str, simulation_path: str): + """ + Initializes the SimulationConfig object with the given input JSON + Args: + input_json: the input JSON object + Raises: + ValueError: if the input JSON is missing required + fields or has invalid values for certain fields + """ + + if "metric" not in input_json: + raise ValueError("Required field 'metric' is missing.") + if "meta_function" not in input_json and input_json["metamodel"]: + raise ValueError( + "Required field 'meta_function' is missing. Please select between 'mean' and 'median'. " + "Alternatively, disable metamodel in the config file." + ) + if input_json["meta_function"] not in FUNCTIONS: + raise ValueError( + "Invalid value for meta_function. Please select between 'mean' and 'median'." + ) + if "multimodel" not in input_json and input_json["metamodel"]: + warn("Warning: Missing 'multimodel' field. Defaulting to 'True'.") + + self.output_path: str = output_path + self.simulation_path: str = simulation_path + self.is_multimodel: bool = input_json.get("multimodel", True) + self.is_metamodel: bool = input_json.get("metamodel", False) + self.metric: str = input_json["metric"] + self.window_function: Callable[[any], float] = FUNCTIONS[input_json.get("window_function", "mean")] + self.meta_function: Callable[[any], float] = FUNCTIONS[input_json.get("meta_function", "mean")] + self.window_size: int = input_json.get("window_size", 1) + self.samples_per_minute: int = input_json.get("samples_per_minute", 0) + self.current_unit: str = input_json.get("current_unit", "") + self.unit_scaling_magnitude: int = input_json.get("unit_scaling_magnitude", 1) + self.plot_type: PlotType = next( + (pt for pt in PlotType if pt.value == input_json.get("plot_type", "time_series")), PlotType.TIME_SERIES) + self.plot_title: str = input_json.get("plot_title", "") + self.x_axis: PlotAxis = PlotAxis( + input_json.get("x_label", ""), + parse_range(input_json, "x"), + input_json.get("x_ticks_count", None) + ) + self.y_axis: PlotAxis = PlotAxis( + input_json.get("y_label", ""), + parse_range(input_json, "y"), + input_json.get("y_ticks_count", None) + ) + self.seed: int = input_json.get("seed", 0) + self.fig_size: tuple[int, int] = input_json.get("figsize", (20, 10)) + self.plot_colors: list[str] = input_json.get("plot_colors", []) + self.figure_export_name: str | None = input_json.get("figure_export_name", None) + + +def parse_range(user_input: dict[str, any], key: str) -> tuple[float, float] | None: + """ + Parses a range from the user input + Args: + user_input: the user input dictionary + key: the key of the range + + Returns: + a tuple containing the minimum and maximum values of the range + """ + + if f"{key}_min" not in user_input or f"{key}_max" not in user_input: + return None + + return user_input[f"{key}_min"], user_input[f"{key}_max"] + + +def parse_configuration(config_path: str, output_path: str, simulation_path: str) -> SimulationConfig: + """ + Reads the input JSON file and returns a SimulationConfig object + Args: + config_path: the path to the input JSON file + output_path: the path to the output folder + simulation_path: the path to the simulation folder + + Returns: + a SimulationConfig object + """ + + try: + with (open(config_path, 'r') as json): + input_json: dict[str, any] = load(json) + except JSONDecodeError: + stderr.write(f"Error decoding JSON in file: {config_path}") + exit(1) + except IOError: + stderr.write(f"Error reading file: {config_path}") + exit(1) + + try: + return SimulationConfig(input_json, output_path, simulation_path) + except ValueError as err: + print(f"Error parsing input JSON: {err}") + exit(1) diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/util.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/util.py new file mode 100644 index 00000000..067af53f --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/util.py @@ -0,0 +1,76 @@ +from json import JSONDecodeError, load + +UNIT_FACTORS: dict[int, str] = { + -9: 'n', + -6: 'μ', + -3: 'm', + 0: '', + 1: 'k', + 3: 'M', + 6: 'G', + 9: 'T' +} + +SIMULATION_ANALYSIS_FOLDER_NAME = 'simulation-analysis' +EMISSIONS_ANALYSIS_FOLDER_NAME = 'carbon_emission' +ENERGY_ANALYSIS_FOLDER_NAME = 'power_draw' + +""" +SIMULATION_DATA_FILE (str): The name of the file containing the simulation data. Enter only the name, not the path, not +the extension. The data file must be parquet format. + +✅ Good: "host", "simulation_data", "cats_predictions" +❌ Wrong: "host.json", "opendc/folder_x/folder_y/data" +""" +SIMULATION_DATA_FILE = "host" # opendc outputs in file host.parquet + + +def adjust_unit(target_unit: str, magnitude: int) -> tuple[str, int]: + """ + Adjusts the unit based on the magnitude provided. + Example: + adjust_unit('W', 3) -> ('kW', 1000) + Args: + target_unit: The target unit to adjust. + magnitude: The magnitude to adjust the unit by. + + Returns: + A tuple containing the adjusted unit and magnitude. + """ + + result_unit = UNIT_FACTORS.get(magnitude, '') + target_unit + result_magnitude = (10 ** magnitude) if magnitude in UNIT_FACTORS else 1 + return result_unit, result_magnitude + + +def clean_analysis_file(metric: str) -> None: + analysis_file_path = SIMULATION_ANALYSIS_FOLDER_NAME + "/" + if metric == "power_draw": + analysis_file_path += ENERGY_ANALYSIS_FOLDER_NAME + else: + analysis_file_path += EMISSIONS_ANALYSIS_FOLDER_NAME + analysis_file_path += "/analysis.txt" + + with open(analysis_file_path, "w") as f: + f.write("") + + +def parse_json(json_path: str) -> dict[str, any]: + """ + Parses a JSON file and returns the dictionary representation. + Args: + json_path: The path to the JSON file. + + Returns: + A dictionary containing the JSON data. + """ + + try: + with open(json_path, 'r') as raw_json: + return load(raw_json) + except JSONDecodeError: + print(f"Error decoding JSON in file: {json_path}") + exit(1) + except IOError: + print(f"Error reading file: {json_path}") + exit(1) |
