diff options
| author | Radu Nicolae <rnicolae04@gmail.com> | 2025-06-16 18:01:07 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-06-16 18:01:07 +0200 |
| commit | 0df3d9ced743ac3385dd710c7133a6cf369b051c (patch) | |
| tree | eff5d6d67c275643e229731ba08c5fe7dc4ccd0a /opendc-experiments/opendc-experiments-m3sa/src/main | |
| parent | c7e303ad1b5217e2ff24cee9538ac841d6149706 (diff) | |
integrated M3SA, updated with tests and CpuPowerModels
Diffstat (limited to 'opendc-experiments/opendc-experiments-m3sa/src/main')
21 files changed, 945 insertions, 900 deletions
diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/M3saAnalyzer.kt b/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/M3SAAnalyzer.kt index 545ed656..5cc7cb78 100644 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/M3saAnalyzer.kt +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/M3SAAnalyzer.kt @@ -20,38 +20,56 @@ * SOFTWARE. */ -package org.opendc.experiments.m3sa - -import kotlin.io.path.Path +import java.nio.file.Files +import java.nio.file.Paths /** * This constant variable should be changed depending on the root folder that is being run. * PATH_TO_PYTHON_MAIN should point to the main python file, ran when the analysis starts. */ -public val ANALYSIS_SCRIPTS_DIRECTORY: String = "./opendc-experiments/opendc-experiments-m3sa/src/main/python" -public val ABSOLUTE_SCRIPT_PATH: String = - Path("$ANALYSIS_SCRIPTS_DIRECTORY/main.py").toAbsolutePath().normalize().toString() -public val SCRIPT_LANGUAGE: String = Path("$ANALYSIS_SCRIPTS_DIRECTORY/venv/bin/python3").toAbsolutePath().normalize().toString() - public fun m3saAnalyze( outputFolderPath: String, m3saSetupPath: String, + m3saExecPath: String, ) { + // script to run + val scriptPath = + Paths.get(m3saExecPath, "main.py") + .toAbsolutePath() + .normalize() + .toString() + + // look for venv python; if missing, use system python3 + val venvPython = + Paths.get(m3saExecPath, "venv", "bin", "python3") + .toAbsolutePath() + .normalize() + val pythonBin = + if (Files.isRegularFile(venvPython) && Files.isExecutable(venvPython)) { + venvPython.toString() + } else { + "python3" // fallback + } + val process = ProcessBuilder( - SCRIPT_LANGUAGE, - ABSOLUTE_SCRIPT_PATH, - outputFolderPath, + pythonBin, + scriptPath, m3saSetupPath, - ).directory(Path(ANALYSIS_SCRIPTS_DIRECTORY).toFile()) + "$outputFolderPath/raw-output", + "-o", + outputFolderPath, + ) + .redirectErrorStream(true) .start() val exitCode = process.waitFor() + val output = process.inputStream.bufferedReader().readText() if (exitCode == 0) { - println("[M3SA says] M3SA operation(s) completed successfully.") + println("[M3SA says] Success:\n$output") } else { - val errors = process.errorStream.bufferedReader().readText() - println("[M3SA says] Exit code $exitCode; Error(s): $errors") + println("[M3SA says] Exit code $exitCode; Output:\n$output") + throw RuntimeException("M3SA analysis failed with exit code $exitCode") } } diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SACli.kt b/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SACli.kt index 4fe58d88..51919722 100644 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SACli.kt +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SACli.kt @@ -30,9 +30,8 @@ import com.github.ajalt.clikt.parameters.options.defaultLazy import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.int +import m3saAnalyze import org.opendc.experiments.base.experiment.getExperiment -import org.opendc.experiments.base.runner.runExperiment -import org.opendc.experiments.m3sa.m3saAnalyze import org.opendc.experiments.m3sa.scenario.getOutputFolder import java.io.File @@ -52,35 +51,62 @@ internal class M3SACommand : CliktCommand(name = "experiment") { .file(canBeDir = false, canBeFile = true) .defaultLazy { File("resources/experiment.json") } - /** - * The number of threads to use for parallelism. - */ - private val parallelism by option("-p", "--parallelism", help = "number of worker threads") - .int() - .default(Runtime.getRuntime().availableProcessors() - 1) - private val m3saPath by option("-m", "--m3sa-setup-path", help = "path to m3sa setup file") .file(canBeDir = false, canBeFile = true) - .defaultLazy { File("") } + + private val m3saExec by option("-e", "--m3sa-exec-path", help = "path to m3sa executable") + .file(canBeDir = true, canBeFile = false) + .defaultLazy { File("opendc-experiments/opendc-experiments-m3sa/src/main/python/") } + + private val inputIterations by option("-i", "--iterations", help = "number of iterations to run") + .int() + .default(1) override fun run() { - println("The provided m3saPath is $m3saPath") - - val experiment = getExperiment(scenarioPath) - runExperiment(experiment, parallelism) - - if (m3saPath.toString().isNotEmpty()) { - m3saAnalyze( - outputFolderPath = getOutputFolder(scenarioPath), - m3saSetupPath = m3saPath.toString(), - ) - } else { - println( - "\n" + - "===================================================\n" + - "|M3SA path is not provided. Skipping M3SA analysis.|\n" + - "===================================================", - ) + val file = File("analysis.txt") + if (!file.exists()) { + file.createNewFile() } + + var iterations = inputIterations + var currentIteration = 1 + + while (iterations > 0) { + val startTime = System.currentTimeMillis() + val experiment = getExperiment(scenarioPath) + org.opendc.experiments.base.runner.runExperiment(experiment) + val simulationEnd = System.currentTimeMillis() + println("Simulation time: ${(simulationEnd - startTime) / 1000} ms") + + if (m3saPath != null) { + m3saAnalyze( + outputFolderPath = getOutputFolder(scenarioPath), + m3saSetupPath = m3saPath.toString(), + m3saExecPath = m3saExec.toString(), + ) + } else { + println( + "\n" + + "===================================================\n" + + "|M3SA path is not provided. Skipping M3SA analysis.|\n" + + "===================================================", + ) + } + + val endTime = System.currentTimeMillis() + println("OpenDC time: ${(simulationEnd - startTime) / 1000.0} s") + println("M3SA time: ${(endTime - simulationEnd) / 1000.0} s") + println("Total operation time: ${(endTime - startTime) / 1000.0} s") + + file.appendText("$currentIteration. OpenDC time: ${(simulationEnd - startTime) / 1000.0} s\n") + file.appendText("$currentIteration. M3SA time: ${(endTime - simulationEnd) / 1000.0} s\n") + file.appendText("$currentIteration. Total operation time: ${(endTime - startTime) / 1000.0} s\n\n") + + iterations -= 1 + currentIteration += 1 + } + + file.appendText("===================================================\n") + println("Finished $scenarioPath") } } diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt b/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt index 49bbdb96..9bc7045f 100644 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/kotlin/org/opendc/experiments/m3sa/runner/M3SARunner.kt @@ -25,7 +25,10 @@ package org.opendc.experiments.m3sa.runner import org.opendc.experiments.base.experiment.Scenario +import org.opendc.experiments.base.runner.runScenario import org.opendc.experiments.base.runner.setupOutputFolderStructure +import java.io.File +import java.util.Optional /** * Run scenario when no pool is available for parallel execution @@ -35,9 +38,35 @@ import org.opendc.experiments.base.runner.setupOutputFolderStructure */ public fun runExperiment( experiment: List<Scenario>, - parallelism: Int, + extraSimDataPath: Optional<String>, ) { + val ansiReset = "\u001B[0m" + val ansiGreen = "\u001B[32m" + val ansiBlue = "\u001B[34m" + setupOutputFolderStructure(experiment[0].outputFolder) - runExperiment(experiment, parallelism) + var latestScenarioId = experiment.map { it.id }.maxOrNull() ?: 0 + + for (scenario in experiment) { + println( + "\n\n$ansiGreen================================================================================$ansiReset", + ) + println("$ansiBlue Running scenario: ${scenario.name} $ansiReset") + println("$ansiGreen================================================================================$ansiReset") + runScenario( + scenario, + ) + } + + if (extraSimDataPath.isEmpty) return + + for (directory in File(extraSimDataPath.get()).listFiles()!!) { + if (!directory.isDirectory) continue + latestScenarioId += 1 + + val copyPath = "${experiment[0].outputFolder}/raw-output/$latestScenarioId" + File(copyPath).mkdirs() + directory.copyRecursively(File(copyPath), true) + } } diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/.gitignore b/opendc-experiments/opendc-experiments-m3sa/src/main/python/.gitignore new file mode 100644 index 00000000..53c9831b --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/.gitignore @@ -0,0 +1,3 @@ +sim +venv +__pycache__ diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/Makefile b/opendc-experiments/opendc-experiments-m3sa/src/main/python/Makefile new file mode 100644 index 00000000..c3d83154 --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/Makefile @@ -0,0 +1,17 @@ +.PHONY: install uninstall clean +SRCS = $(wildcard models/*) $(wildcard util/*) m3sa main.py + +install: $(SRCS) venv + +venv: requirements.txt + python -m venv venv + . venv/bin/activate && pip install -r requirements.txt + ln -s $(PWD)/m3sa ${HOME}/.local/bin/m3sa + @echo "WARNING: M3SA is installed under $(PWD). Do not move the directory." + +uninstall: + rm -rf venv + rm -f ${HOME}/.local/bin/m3sa + +clean: + rm -rf __pycache__ .mypy_cache .pytest_cache diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/input_parser.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/input_parser.py deleted file mode 100644 index cb1bc2b9..00000000 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/python/input_parser.py +++ /dev/null @@ -1,135 +0,0 @@ -import json -import os -import sys -import warnings - - -def read_input(path=""): - """ - Reads and processes the input JSON file from the specified path. Validates the input path, - ensures the file exists, and decodes the JSON content. Switches to the project root directory - before returning the parsed input. - - :param path: The relative path to the input JSON file. - :type path: str - :raises ValueError: If the input path is not provided, file does not exist, or JSON decoding fails. - :return: Parsed JSON content. - :rtype: dict - :side effect: Changes the working directory to the project root. - """ - if not path: - raise ValueError("No input path provided.") - - path = path.strip().strip(',') - - project_root = find_root_dir() - if not project_root: - raise ValueError("Project root not found.") - - full_path = os.path.join(project_root, path) - - if not os.path.exists(full_path): - raise ValueError(f"File does not exist: {full_path}") - - try: - with open(full_path, 'r') as raw_json: - input_json = json.load(raw_json) - except json.JSONDecodeError: - raise ValueError("Failed to decode JSON.") - except IOError: - raise ValueError("MultiModel's parser says: Error opening file.") - - switch_to_root_dir() - - # Validate and apply defaults - input_json = parse_input(input_json) - return input_json - - -def parse_input(input_json): - """ - Validates and applies default values to the input JSON content. Ensures required fields are present - and raises warnings or errors for missing or invalid values. - - :param input_json: The input JSON content. - :type input_json: dict - :raises ValueError: If required fields are missing or invalid values are provided. - :return: Validated and processed JSON content with defaults applied. - :rtype: dict - """ - - DEFAULTS = { - "multimodel": True, - "metamodel": False, - "window_size": 1, - "window_function": "mean", - "meta_function": "mean", - "samples_per_minute": 0, - "current_unit": "", - "unit_scaling_magnitude": 1, - "plot_type": "time_series", - "plot_title": "", - "x_label": "", - "y_label": "", - "seed": 0, - "y_ticks_count": None, - "x_ticks_count": None, - "y_min": None, - "y_max": None, - "x_min": None, - "x_max": None, - } - - # Apply default values where not specified - for key, default_value in DEFAULTS.items(): - if key not in input_json: - input_json[key] = default_value - - # Special handling for required fields without default values - if "metric" not in input_json: - raise ValueError("Required field 'metric' is missing.") - - if ("meta_function" not in input_json) and input_json["metamodel"]: - raise ValueError("Required field 'meta_function' is missing. Please select between 'mean' and 'median'. Alternatively," - "disable metamodel in the config file.") - - if input_json["meta_function"] not in ["mean", "median", "meta_equation1", "equation2", "equation3"]: - raise ValueError("Invalid value for meta_function. Please select between 'mean', 'median', !!!!!!!to be updated in the end!!!!!!!!.") - - # raise a warning - if not input_json["multimodel"] and input_json["metamodel"]: - warnings.warn("Warning: Cannot have a Meta-Model without a Multi-Model. No computation made.") - - return input_json - - -def find_root_dir(): - """ - Searches for the project root directory by looking for a 'README.md' file in the current - and parent directories. - - :return: The path to the project root directory if found, otherwise None. - :rtype: str or None - """ - current_dir = os.path.dirname(os.path.abspath(__file__)) - root = os.path.abspath(os.sep) - while current_dir and current_dir != root: - if os.path.exists(os.path.join(current_dir, 'README.md')): - return current_dir - current_dir = os.path.dirname(current_dir) - return None - - -def switch_to_root_dir(): - """ - Switches the current working directory to the project root directory. Exits the program if the - root directory is not found. - - :side effect: Changes the current working directory or exits the program. - """ - root_dir = find_root_dir() - if root_dir: - os.chdir(root_dir) - else: - print("Failed to switch to root directory.") - sys.exit(1) diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/m3sa b/opendc-experiments/opendc-experiments-m3sa/src/main/python/m3sa new file mode 100755 index 00000000..06ecaaea --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/m3sa @@ -0,0 +1,19 @@ +#!/bin/sh + +my_path() { + cd -P -- "$(dirname -- "$(realpath "$(command -v -- "$0")")")" && pwd -P +} + +SRC_PATH="$(my_path)" +VENV_PATH="$SRC_PATH/venv" + +if [ ! -d "$VENV_PATH" ]; then + python3 -m venv "$VENV_PATH" || exit 1 + pip install --upgrade pip || exit 1 + pip install -r "$SRC_PATH/requirements.txt" || exit 1 +fi + +. "$VENV_PATH/bin/activate" +python3 "$SRC_PATH/main.py" "$@" + + diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/main.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/main.py index 11ee836d..8f7b82ec 100644 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/python/main.py +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/main.py @@ -1,19 +1,29 @@ -from os import sys +from models import MultiModel, MetaModel +from util import SimulationConfig, parse_configuration +from argparse import ArgumentParser, Namespace -from input_parser import read_input -from models.MetaModel import MetaModel -from models.MultiModel import MultiModel + +def arg_parser() -> Namespace: + parser = ArgumentParser(prog="m3sa", description="Multi-Model Simulation and Analysis") + parser.add_argument("config", help="Path to the JSON configuration file", type=str) + parser.add_argument("simulation", help="Path to the simulation directory", type=str) + parser.add_argument("-o", "--output", help="Path to the output directory", type=str, nargs="?") + return parser.parse_args() def main(): - multimodel = MultiModel( - user_input=read_input(sys.argv[2]), - path=sys.argv[1], - ) + arg_input: Namespace = arg_parser() + output_path: str = arg_input.output if arg_input.output else "output" + simulation_path: str = arg_input.simulation + simulation_config: SimulationConfig = parse_configuration(arg_input.config, output_path, simulation_path) - multimodel.generate_plot() + multi_model: MultiModel = MultiModel(config=simulation_config) + multi_model.generate_plot() - MetaModel(multimodel) + if simulation_config.is_metamodel: + meta_model: MetaModel = MetaModel(multi_model) + meta_model.compute() + meta_model.output() if __name__ == "__main__": diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/Model.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/Model.py deleted file mode 100644 index f60f0bb0..00000000 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/Model.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -A model is the output of simulator. It contains the data the simulator output, under a certain topology, seed, -workload, datacenter configuration, etc. A model is further used in the analyzer as part of the MultiModel class, -and further in the MetaModel class. - -:param sim: the simulation data of the model -""" -import json -from dataclasses import dataclass, field - -@dataclass -class Model: - """ - Represents a single simulation output containing various data metrics collected under specific simulation conditions. - A Model object stores raw and processed simulation data and is designed to interact with higher-level structures like - MultiModel and MetaModel for complex data analysis. - - Attributes: - raw_sim_data (list): Initial raw data from the simulator output. - processed_sim_data (list): Data derived from raw_sim_data after applying certain processing operations like aggregation or smoothing. - cumulative_time_series_values (list): Stores cumulative data values useful for time series analysis. - id (int): Unique identifier for the model, typically used for tracking and referencing within analysis tools. - path (str): Base path for storing or accessing related data files. - cumulated (float): Cumulative sum of processed data, useful for quick summaries and statistical analysis. - experiment_name (str): A descriptive name for the experiment associated with this model, potentially extracted from external metadata. - margins_of_error (list): Stores error margins associated with the data, useful for uncertainty analysis. - topologies (list): Describes the network or system topologies used during the simulation. - workloads (list): Lists the types of workloads applied during the simulation, affecting the simulation's applicability and scope. - allocation_policies (list): Details the resource allocation policies used, which influence the simulation outcomes. - carbon_trace_paths (list): Paths to data files containing carbon output or usage data, important for environmental impact studies. - - Methods: - parse_trackr(): Reads additional configuration and metadata from a JSON file named 'trackr.json', enhancing the model with detailed context information. - - Usage: - Model objects are typically instantiated with raw data from simulation outputs and an identifier. After instantiation, - the 'parse_trackr' method can be called to load additional experimental details from a corresponding JSON file. - """ - - path: str - raw_sim_data: list - id: int - processed_sim_data: list = field(default_factory=list) - cumulative_time_series_values: list = field(default_factory=list) - cumulated: float = 0.0 - experiment_name: str = "" - margins_of_error: list = field(default_factory=list) - topologies: list = field(default_factory=list) - workloads: list = field(default_factory=list) - allocation_policies: list = field(default_factory=list) - carbon_trace_paths: list = field(default_factory=list) - - def parse_trackr(self): - """ - Parses the 'trackr.json' file located in the model's base path to extract and store detailed experimental metadata. - This method enhances the model with comprehensive contextual information about the simulation environment. - - :return: None - :side effect: Updates model attributes with data from the 'trackr.json' file, such as experiment names, topologies, and policies. - :raises FileNotFoundError: If the 'trackr.json' file does not exist at the specified path. - :raises json.JSONDecodeError: If there is an error parsing the JSON data. - """ - trackr_path = self.path + "/trackr.json" - with open(trackr_path) as f: - trackr = json.load(f) - self.experiment_name = trackr.get(self.id, {}).get('name', "") - self.topologies = trackr.get(self.id, {}).get('topologies', []) - self.workloads = trackr.get(self.id, {}).get('workloads', []) - self.allocation_policies = trackr.get(self.id, {}).get('allocationPolicies', []) - self.carbon_trace_paths = trackr.get(self.id, {}).get('carbonTracePaths', []) diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MultiModel.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MultiModel.py deleted file mode 100644 index 17a92765..00000000 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MultiModel.py +++ /dev/null @@ -1,501 +0,0 @@ -import matplotlib.pyplot as plt -import numpy as np -import os -import pyarrow.parquet as pq -import time -from matplotlib.ticker import MaxNLocator, FuncFormatter - -from simulator_specifics import * -from .MetaModel import MetaModel -from .Model import Model - - -def is_meta_model(model): - """ - Check if the given model is a MetaModel based on its ID. A metamodel will always have an id of -101. - - Args: - model (Model): The model to check. - - Returns: - bool: True if model is MetaModel, False otherwise. - """ - return model.id == MetaModel.META_MODEL_ID - - -class MultiModel: - """ - Handles multiple simulation models, aggregates their data based on user-defined parameters, - and generates plots and statistics. - - Attributes: - user_input (dict): Configuration dictionary containing user settings for model processing. - path (str): The base directory path where output files and analysis results are stored. - window_size (int): The size of the window for data aggregation, which affects how data smoothing and granularity are handled. - models (list of Model): A list of Model instances that store the simulation data. - metric (str): The specific metric to be analyzed and plotted, as defined by the user. - measure_unit (str): The unit of measurement for the simulation data, adjusted according to the user's specifications. - output_folder_path (str): Path to the folder where output files are saved. - raw_output_path (str): Directory path where raw simulation data is stored. - analysis_file_path (str): Path to the file where detailed analysis results are recorded. - plot_type (str): The type of plot to generate, which can be 'time_series', 'cumulative', or 'cumulative_time_series'. - plot_title (str): The title of the plot. - x_label (str), y_label (str): Labels for the x and y axes of the plot. - x_min (float), x_max (float), y_min (float), y_max (float): Optional parameters to define axis limits for the plots. - - Methods: - parse_user_input(window_size): Parses and sets the class attributes based on the provided user input. - adjust_unit(): Adjusts the unit of measurement based on user settings, applying appropriate metric prefixes. - set_paths(): Initializes the directory paths for storing outputs and analysis results. - init_models(): Reads simulation data from Parquet files and initializes Model instances. - compute_windowed_aggregation(): Processes the raw data by applying a windowed aggregation function for smoothing. - generate_plot(): Orchestrates the generation of the specified plot type by calling the respective plotting functions. - generate_time_series_plot(): Generates a time series plot of the aggregated data. - generate_cumulative_plot(): Creates a bar chart showing cumulative data for each model. - generate_cumulative_time_series_plot(): Produces a plot that displays cumulative data over time for each model. - save_plot(): Saves the generated plot to a PDF file in the specified directory. - output_stats(): Writes detailed statistics of the simulation to an analysis file for record-keeping. - mean_of_chunks(np_array, window_size): Calculates the mean of data segments for smoothing and processing. - get_cumulative_limits(model_sums): Determines appropriate x-axis limits for cumulative plots based on the model data. - - Usage: - To use this class, instantiate it with a dictionary of user settings, a path for outputs, and optionally a window size. - Call the `generate_plot` method to process the data and generate plots as configured by the user. - """ - - def __init__(self, user_input, path, window_size=-1): - """ - Initializes the MultiModel with provided user settings and prepares the environment. - - :param user_input (dict): Configurations and settings from the user. - :param path (str): Path where output and analysis will be stored. - :param window_size (int): The size of the window to aggregate data; uses user input if -1. - :return: None - """ - - self.starting_time = time.time() - self.end_time = None - self.workload_time = None - - self.user_input = user_input - - self.metric = None - self.measure_unit = None - self.path = path - self.models = [] - - self.folder_path = None - self.output_folder_path = None - self.raw_output_path = None - self.analysis_file_path = None - self.unit_scaling = 1 - self.window_size = -1 - self.window_function = "median" - self.max_model_len = 0 - self.seed = 0 - - self.plot_type = None - self.plot_title = None - self.x_label = None - self.y_label = None - self.x_min = None - self.x_max = None - self.y_min = None - self.y_max = None - self.plot_path = None - - self.parse_user_input(window_size) - self.set_paths() - self.init_models() - - self.compute_windowed_aggregation() - - def parse_user_input(self, window_size): - """ - Parses and sets attributes based on user input. - - :param window_size (int): Specified window size for data aggregation, defaults to user_input if -1. - :return: None - """ - if window_size == -1: - self.window_size = self.user_input["window_size"] - else: - self.window_size = window_size - self.metric = self.user_input["metric"] - self.measure_unit = self.adjust_unit() - self.window_function = self.user_input["window_function"] - self.seed = self.user_input["seed"] - - self.plot_type = self.user_input["plot_type"] - self.plot_title = self.user_input["plot_title"] - if self.user_input["x_label"] == "": - self.x_label = "Samples" - else: - self.x_label = self.user_input["x_label"] - - if self.user_input["y_label"] == "": - self.y_label = self.metric + " [" + self.measure_unit + "]" - else: - self.y_label = self.user_input["y_label"] - - self.y_min = self.user_input["y_min"] - self.y_max = self.user_input["y_max"] - self.x_min = self.user_input["x_min"] - self.x_max = self.user_input["x_max"] - - def adjust_unit(self): - """ - Adjusts the unit of measurement according to the scaling magnitude specified by the user. - This method translates the given measurement scale into a scientifically accepted metric prefix. - - :return str: The metric prefixed by the appropriate scale (e.g., 'kWh' for kilo-watt-hour if the scale is 3). - :raise ValueError: If the unit scaling magnitude provided by the user is not within the accepted range of scaling factors. - """ - prefixes = ['n', 'μ', 'm', '', 'k', 'M', 'G', 'T'] - scaling_factors = [-9, -6, -3, 1, 3, 6, 9] - given_metric = self.user_input["current_unit"] - self.unit_scaling = self.user_input["unit_scaling_magnitude"] - - if self.unit_scaling not in scaling_factors: - raise ValueError( - "Unit scaling factor not found. Please enter a valid unit from [-9, -6, -3, 1, 3, 6, 9].") - - if self.unit_scaling == 1: - return given_metric - - for i in range(len(scaling_factors)): - if self.unit_scaling == scaling_factors[i]: - self.unit_scaling = 10 ** self.unit_scaling - result = prefixes[i] + given_metric - return result - - def set_paths(self): - """ - Configures and initializes the directory paths for output and analysis based on the base directory provided. - This method sets paths for the raw output and detailed analysis results, ensuring directories are created if - they do not already exist, and prepares a base file for capturing analytical summaries. - - :return: None - :side effect: Creates necessary directories and files for output and analysis. - """ - self.output_folder_path = os.getcwd() + "/" + self.path - self.raw_output_path = os.getcwd() + "/" + self.path + "/raw-output" - self.analysis_file_path = os.getcwd() + "/" + self.path + "/simulation-analysis/" - os.makedirs(self.analysis_file_path, exist_ok=True) - self.analysis_file_path = os.path.join(self.analysis_file_path, "analysis.txt") - if not os.path.exists(self.analysis_file_path): - with open(self.analysis_file_path, "w") as f: - f.write("Analysis file created.\n") - - def init_models(self): - """ - Initializes models from the simulation output stored in Parquet files. This method reads each Parquet file, - processes the relevant data, and initializes Model instances which are stored in the model list. - - :return: None - :raise ValueError: If the unit scaling has not been set prior to model initialization. - """ - model_id = 0 - - for simulation_folder in os.listdir(self.raw_output_path): - if simulation_folder == "metamodel": - continue - path_of_parquet_file = f"{self.raw_output_path}/{simulation_folder}/seed={self.seed}/{SIMULATION_DATA_FILE}.parquet" - parquet_file = pq.read_table(path_of_parquet_file).to_pandas() - raw = parquet_file.select_dtypes(include=[np.number]).groupby("timestamp") - raw = raw[self.metric].sum().values - - if self.unit_scaling is None: - raise ValueError("Unit scaling factor is not set. Please ensure it is set correctly.") - - raw = np.divide(raw, self.unit_scaling) - - if self.user_input["samples_per_minute"] > 0: - MINUTES_IN_DAY = 1440 - self.workload_time = len(raw) * self.user_input["samples_per_minute"] / MINUTES_IN_DAY - - model = Model(raw_sim_data=raw, id=model_id, path=self.output_folder_path) - self.models.append(model) - model_id += 1 - - self.max_model_len = min([len(model.raw_sim_data) for model in self.models]) - - def compute_windowed_aggregation(self): - """ - Applies a windowed aggregation function to each model's dataset. This method is typically used for smoothing - or reducing data granularity. It involves segmenting the dataset into windows of specified size and applying - an aggregation function to each segment. - - :return: None - :side effect: Modifies each model's processed_sim_data attribute to contain aggregated data. - """ - if self.plot_type != "cumulative": - for model in self.models: - numeric_values = model.raw_sim_data - model.processed_sim_data = self.mean_of_chunks(numeric_values, self.window_size) - - def generate_plot(self): - """ - Creates and saves plots based on the processed data from multiple models. This method determines - the type of plot to generate based on user input and invokes the appropriate plotting function. - - The plotting options supported are 'time_series', 'cumulative', and 'cumulative_time_series'. - Depending on the type specified, this method delegates to specific plot-generating functions. - - :return: None - :raises ValueError: If the plot type specified is not recognized or supported by the system. - :side effect: - - Generates and saves a plot to the file system. - - Updates the plot attributes based on the generated plot. - - Displays the plot on the matplotlib figure canvas. - """ - plt.figure(figsize=(12, 10)) - plt.xticks(size=22) - plt.yticks(size=22) - plt.ylabel(self.y_label, size=26) - plt.xlabel(self.x_label, size=26) - plt.title(self.plot_title, size=26) - plt.grid() - - formatter = FuncFormatter(lambda x, _: '{:,}'.format(int(x)) if x >= 1000 else int(x)) - ax = plt.gca() - ax.xaxis.set_major_formatter(formatter) - # ax.yaxis.set_major_formatter(formatter) yaxis has formatting issues - to solve in a future iteration - - if self.user_input['x_ticks_count'] is not None: - ax = plt.gca() - ax.xaxis.set_major_locator(MaxNLocator(self.user_input['x_ticks_count'])) - - if self.user_input['y_ticks_count'] is not None: - ax = plt.gca() - ax.yaxis.set_major_locator(MaxNLocator(self.user_input['y_ticks_count'])) - - self.set_x_axis_lim() - self.set_y_axis_lim() - - if self.plot_type == "time_series": - self.generate_time_series_plot() - elif self.plot_type == "cumulative": - self.generate_cumulative_plot() - elif self.plot_type == "cumulative_time_series": - self.generate_cumulative_time_series_plot() - else: - raise ValueError( - "Plot type not recognized. Please enter a valid plot type. The plot can be either " - "'time_series', 'cumulative', or 'cumulative_time_series'." - ) - - plt.tight_layout() - plt.subplots_adjust(right=0.85) - plt.legend(fontsize=12, bbox_to_anchor=(1, 1)) - self.save_plot() - self.output_stats() - - def generate_time_series_plot(self): - """ - Plots time series data for each model. This function iterates over each model, applies the defined - windowing function to smooth the data, and plots the resulting series. - - :return: None - :side effect: Plots are displayed on the matplotlib figure canvas. - """ - for model in self.models: - label = "Meta-Model" if is_meta_model(model) else "Model " + str(model.id) - if is_meta_model(model): - repeated_means = np.repeat(means, self.window_size)[:len(model.processed_sim_data) * self.window_size] - plt.plot( - repeated_means, - drawstyle='steps-mid', - label=label, - color="red", - linestyle="--", - marker="o", - markevery=max(1, len(repeated_means) // 50), - linewidth=2 - ) - else: - means = self.mean_of_chunks(model.raw_sim_data, self.window_size) - repeated_means = np.repeat(means, self.window_size)[:len(model.raw_sim_data)] - plt.plot(repeated_means, drawstyle='steps-mid', label=label) - - def generate_cumulative_plot(self): - """ - Generates a horizontal bar chart showing cumulative data for each model. This function - aggregates total values per model and displays them in a bar chart, providing a visual - comparison of total values across models. - - :return: None - :side effect: Plots are displayed on the matplotlib figure canvas. - """ - plt.xlim(self.get_cumulative_limits(model_sums=self.sum_models_entries())) - plt.ylabel("Model ID", size=20) - plt.xlabel("Total " + self.metric + " [" + self.measure_unit + "]") - plt.yticks(range(len(self.models)), [model.id for model in self.models]) - plt.grid(False) - - cumulated_energies = self.sum_models_entries() - for i, model in enumerate(self.models): - label = "Meta-Model" if is_meta_model(model) else "Model " + str(model.id) - if is_meta_model(model): - plt.barh(label=label, y=i, width=cumulated_energies[i], color="red") - else: - plt.barh(label=label, y=i, width=cumulated_energies[i]) - plt.text(cumulated_energies[i], i, str(cumulated_energies[i]), ha='left', va='center', size=26) - - def generate_cumulative_time_series_plot(self): - """ - Generates a plot showing the cumulative data over time for each model. This visual representation is - useful for analyzing trends and the accumulation of values over time. - - :return: None - :side effect: Displays the cumulative data over time on the matplotlib figure canvas. - """ - self.compute_cumulative_time_series() - - for model in self.models: - if is_meta_model(model): - cumulative_repeated = np.repeat(model.cumulative_time_series_values, self.window_size)[ - :len(model.processed_sim_data) * self.window_size] - plt.plot( - cumulative_repeated, - drawstyle='steps-mid', - label=("Meta-Model"), - color="red", - linestyle="--", - marker="o", - markevery=max(1, len(cumulative_repeated) // 10), - linewidth=3 - ) - else: - cumulative_repeated = np.repeat(model.cumulative_time_series_values, self.window_size)[ - :len(model.raw_sim_data)] - plt.plot(cumulative_repeated, drawstyle='steps-mid', label=("Model " + str(model.id))) - - def compute_cumulative_time_series(self): - """ - Computes the cumulative sum of processed data over time for each model, storing the result for use in plotting. - - :return: None - :side effect: Updates each model's 'cumulative_time_series_values' attribute with the cumulative sums. - """ - for model in self.models: - cumulative_array = [] - _sum = 0 - for value in model.processed_sim_data: - _sum += value - cumulative_array.append(_sum * self.window_size) - model.cumulative_time_series_values = cumulative_array - - def save_plot(self): - """ - Saves the current plot to a PDF file in the specified directory, constructing the file path from the - plot attributes and ensuring that the directory exists before saving. - - :return: None - :side effect: Creates or overwrites a PDF file containing the plot in the designated folder. - """ - folder_prefix = self.output_folder_path + "/simulation-analysis/" + self.metric + "/" - self.plot_path = folder_prefix + self.plot_type + "_plot_multimodel_metric=" + self.metric + "_window=" + str( - self.window_size) + ".pdf" - plt.savefig(self.plot_path) - - def set_x_axis_lim(self): - """ - Sets the x-axis limits for the plot based on user-defined minimum and maximum values. If values - are not specified, the axis limits will default to encompassing all data points. - - :return: None - :side effect: Adjusts the x-axis limits of the current matplotlib plot. - """ - if self.x_min is not None: - plt.xlim(left=self.x_min) - - if self.x_max is not None: - plt.xlim(right=self.x_max) - - def set_y_axis_lim(self): - """ - Dynamically sets the y-axis limits to be slightly larger than the range of the data, enhancing - the readability of the plot by ensuring all data points are comfortably within the view. - - :return: None - :side effect: Adjusts the y-axis limits of the current matplotlib plot. - """ - if self.y_min is not None: - plt.ylim(bottom=self.y_min) - if self.y_max is not None: - plt.ylim(top=self.y_max) - - def sum_models_entries(self): - """ - Computes the total values from each model for use in cumulative plotting. This method aggregates - the data across all models and prepares it for cumulative display. - - :return: List of summed values for each model, useful for plotting and analysis. - """ - models_sums = [] - for (i, model) in enumerate(self.models): - if is_meta_model(model): - models_sums.append(model.cumulated) - else: - cumulated_energy = model.raw_sim_data.sum() - cumulated_energy = round(cumulated_energy, 2) - models_sums.append(cumulated_energy) - - return models_sums - - def output_stats(self): - """ - Records and writes detailed simulation statistics to an analysis file. This includes time stamps, - performance metrics, and other relevant details. - - :return: None - :side effect: Appends detailed simulation statistics to an existing file for record-keeping and analysis. - """ - self.end_time = time.time() - with open(self.analysis_file_path, "a") as f: - f.write("\n\n========================================\n") - f.write("Simulation made at " + time.strftime("%Y-%m-%d %H:%M:%S") + "\n") - f.write("Metric: " + self.metric + "\n") - f.write("Unit: " + self.measure_unit + "\n") - f.write("Window size: " + str(self.window_size) + "\n") - f.write("Sample count in raw sim data: " + str(self.max_model_len) + "\n") - f.write("Computing time " + str(round(self.end_time - self.starting_time, 1)) + "s\n") - if (self.user_input["samples_per_minute"] > 0): - f.write("Workload time: " + str(round(self.workload_time, 2)) + " days\n") - f.write("Plot path" + self.plot_path + "\n") - f.write("========================================\n") - - def mean_of_chunks(self, np_array, window_size): - """ - Calculates the mean of data within each chunk for a given array. This method helps in smoothing the data by - averaging over specified 'window_size' segments. - - :param np_array (np.array): Array of numerical data to be chunked and averaged. - :param window_size (int): The size of each segment to average over. - :return: np.array: An array of mean values for each chunk. - :side effect: None - """ - if window_size == 1: - return np_array - - chunks = [np_array[i:i + window_size] for i in range(0, len(np_array), window_size)] - means = [np.mean(chunk) for chunk in chunks] - return np.array(means) - - def get_cumulative_limits(self, model_sums): - """ - Calculates the appropriate x-axis limits for cumulative plots based on the summarized data from each model. - - :param model_sums (list of float): The total values for each model. - :return: tuple: A tuple containing the minimum and maximum x-axis limits. - """ - axis_min = min(model_sums) * 0.9 - axis_max = max(model_sums) * 1.1 - - if self.user_input["x_min"] is not None: - axis_min = self.user_input["x_min"] - if self.user_input["x_max"] is not None: - axis_max = self.user_input["x_max"] - - return [axis_min * 0.9, axis_max * 1.1] diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/__init__.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/__init__.py new file mode 100644 index 00000000..e2d5aaee --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/__init__.py @@ -0,0 +1,3 @@ +from .model import Model +from .multi_model import MultiModel +from .meta_model import MetaModel diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MetaModel.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/meta_model.py index 49930d25..a6d0fded 100644 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/MetaModel.py +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/meta_model.py @@ -1,8 +1,8 @@ -import numpy as np import os import pandas as pd - -from .Model import Model +from models import Model, MultiModel +from typing import Callable +from util import PlotType class MetaModel: @@ -20,43 +20,32 @@ class MetaModel: function_map (dict): Mapping of aggregation function names to function implementations. """ - META_MODEL_ID = -101 + META_MODEL_ID = 'M' - def __init__(self, multimodel, meta_function=None): + def __init__(self, multi_model: MultiModel, meta_function: Callable[[any], float] = None): """ Initializes the Metamodel with a MultiModel instance and prepares aggregation functions based on configuration. - :param multimodel: MultiModel instance containing the models to aggregate. + :param multi_model: MultiModel instance containing the models to aggregate. :raise ValueError: If metamodel functionality is not enabled in the configuration. """ - if not multimodel.user_input.get('metamodel', False): + if not multi_model.config.is_metamodel: raise ValueError("Metamodel is not enabled in the config file") - self.function_map = { - 'mean': self.mean, - 'median': self.median, - 'meta_equation1': self.meta_equation1, - } - - self.multi_model = multimodel - self.meta_model = Model( + self.multi_model = multi_model + self.meta_model: Model = Model( raw_sim_data=[], - id=self.META_MODEL_ID, - path=self.multi_model.output_folder_path + identifier=self.META_MODEL_ID, ) - if meta_function is not None: - self.meta_function = meta_function - else: - self.meta_function = self.function_map.get(multimodel.user_input['meta_function'], self.mean) + self.meta_function: Callable[ + [any], float] = self.multi_model.config.meta_function if meta_function is None else meta_function self.min_raw_model_len = min([len(model.raw_sim_data) for model in self.multi_model.models]) self.min_processed_model_len = min([len(model.processed_sim_data) for model in self.multi_model.models]) self.number_of_models = len(self.multi_model.models) - self.compute() - self.output() - def output(self): + def output(self) -> None: """ Generates outputs by plotting the aggregated results and exporting the metamodel data to a file. :return: None @@ -65,34 +54,32 @@ class MetaModel: self.plot() self.output_metamodel() - def compute(self): + def compute(self) -> None: """ Computes aggregated data based on the specified plot type from the configuration. :raise ValueError: If an unsupported plot type is specified in the configuration. """ - if self.multi_model.plot_type == 'time_series': - self.compute_time_series() - elif self.multi_model.plot_type == 'cumulative': - self.compute_cumulative() - elif self.multi_model.plot_type == 'cumulative_time_series': - self.compute_cumulative_time_series() - else: - raise ValueError("Invalid plot type in config file") + match self.multi_model.config.plot_type: + case PlotType.TIME_SERIES: + self.compute_time_series() + case PlotType.CUMULATIVE: + self.compute_cumulative() + case PlotType.CUMULATIVE_TIME_SERIES: + self.compute_cumulative_time_series() - def plot(self): + def plot(self) -> None: """ Plots the aggregated data according to the specified plot type from the configuration. :raise ValueError: If an unsupported plot type is specified. """ - if self.multi_model.plot_type == 'time_series': - self.plot_time_series() - elif self.multi_model.plot_type == 'cumulative': - self.plot_cumulative() - elif self.multi_model.plot_type == 'cumulative_time_series': - self.plot_cumulative_time_series() - else: - raise ValueError("Invalid plot type in config file") + match self.multi_model.config.plot_type: + case PlotType.TIME_SERIES: + self.plot_time_series() + case PlotType.CUMULATIVE: + self.plot_cumulative() + case PlotType.CUMULATIVE_TIME_SERIES: + self.plot_cumulative_time_series() def compute_time_series(self): """ @@ -102,8 +89,8 @@ class MetaModel: """ for i in range(0, self.min_processed_model_len): data_entries = [] - for j in range(self.number_of_models): - data_entries.append(self.multi_model.models[j].processed_sim_data[i]) + for model in self.multi_model.models: + data_entries.append(model.processed_sim_data[i]) self.meta_model.processed_sim_data.append(self.meta_function(data_entries)) self.meta_model.raw_sim_data = self.meta_model.processed_sim_data @@ -122,14 +109,14 @@ class MetaModel: :return: None :side effect: Updates the meta_model's cumulative data with aggregated results. """ - for i in range(0, self.min_raw_model_len): data_entries = [] - for j in range(self.number_of_models): - sim_data = self.multi_model.models[j].raw_sim_data + for model in self.multi_model.models: + sim_data = model.raw_sim_data ith_element = sim_data[i] data_entries.append(ith_element) - self.meta_model.cumulated += self.mean(data_entries) + self.meta_model.cumulated += self.meta_function(data_entries) + self.meta_model.cumulated = round(self.meta_model.cumulated, 2) def plot_cumulative(self): @@ -149,8 +136,8 @@ class MetaModel: """ for i in range(0, self.min_processed_model_len): data_entries = [] - for j in range(self.number_of_models): - data_entries.append(self.multi_model.models[j].processed_sim_data[i]) + for model in self.multi_model.models: + data_entries.append(model.processed_sim_data[i]) self.meta_model.processed_sim_data.append(self.meta_function(data_entries)) def plot_cumulative_time_series(self): @@ -168,47 +155,18 @@ class MetaModel: :return: None :side effect: Writes data to a parquet file at the specified directory path. """ - directory_path = os.path.join(self.multi_model.output_folder_path, "raw-output/metamodel/seed=0") - os.makedirs(directory_path, exist_ok=True) - current_path = os.path.join(directory_path, f"{self.multi_model.metric}.parquet") - df = pd.DataFrame({'processed_sim_data': self.meta_model.processed_sim_data}) - df.to_parquet(current_path, index=False) + directory_path = os.path.join(self.multi_model.config.output_path, "raw-output/metamodel/seed=0") + try: + os.makedirs(directory_path, exist_ok=True) + except OSError as e: + print(f"Error creating directory: {e}") + exit(1) - def mean(self, chunks): - """ - Calculates the mean of a list of numerical data. - - :param chunks (list): The data over which to calculate the mean. - :return: float: The mean of the provided data. - """ - return np.mean(chunks) + current_path = os.path.join(directory_path, f"{self.multi_model.config.metric}.parquet") + minimum = min(len(self.multi_model.timestamps), len(self.meta_model.processed_sim_data)) - def median(self, chunks): - """ - Calculates the median of a list of numerical data. - - :param chunks (list): The data over which to calculate the median. - :return: float: The median of the provided data. - """ - return np.median(chunks) - - def meta_equation1(self, chunks): - """ - Calculates a weighted mean where the weights are inversely proportional to the absolute difference from the median value. - :param chunks (list): Data chunks from which to calculate the weighted mean. - :return: float: The calculated weighted mean. - """ - - """Attempt 1""" - # median_val = np.median(chunks) - # proximity_weights = 1 / (1 + np.abs(chunks - median_val)) # Avoid division by zero - # weighted_mean = np.sum(proximity_weights * chunks) / np.sum(proximity_weights) - # return weighted_mean - - """Attempt 2 Inter-Quartile Mean (same accuracy as mean)""" - # sorted_preds = np.sort(chunks, axis=0) - # Q1 = int(np.floor(0.25 * len(sorted_preds))) - # Q3 = int(np.floor(0.75 * len(sorted_preds))) - # - # iqm = np.mean(sorted_preds[Q1:Q3], axis=0) - # return iqm + df = pd.DataFrame({ + "timestamp": self.multi_model.timestamps[:minimum], + self.multi_model.config.metric: self.meta_model.processed_sim_data[:minimum] + }) + df.to_parquet(current_path, index=False) diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/model.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/model.py new file mode 100644 index 00000000..bfffd090 --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/model.py @@ -0,0 +1,32 @@ +""" +A model is the output of simulator. It contains the data the simulator output, under a certain topology, seed, +workload, datacenter configuration, etc. A model is further used in the analyzer as part of the MultiModel class, +and further in the MetaModel class. + +:param sim: the simulation data of the model +""" +import json + + +class Model: + """ + Represents a single simulation output containing various data metrics collected under specific simulation conditions. + A Model object stores raw and processed simulation data and is designed to interact with higher-level structures like + MultiModel and MetaModel for complex data analysis. + """ + + def __init__(self, raw_sim_data, identifier: str): + self.raw_sim_data = raw_sim_data + self.id: str = str(identifier) + self.processed_sim_data = [] + self.cumulative_time_series_values = [] + self.cumulated: float = 0.0 + self.experiment_name: str = "" + self.margins_of_error = [] + self.topologies = [] + self.workloads = [] + self.allocation_policies = [] + self.carbon_trace_paths = [] + + def is_meta_model(self) -> bool: + return self.id == "M" diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/multi_model.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/multi_model.py new file mode 100644 index 00000000..4f993fee --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/models/multi_model.py @@ -0,0 +1,410 @@ +import matplotlib.pyplot as plt +import numpy as np +import os +import pyarrow.parquet as pq +from time import time, strftime +from matplotlib.ticker import MaxNLocator, FuncFormatter +from matplotlib.ticker import AutoMinorLocator +from typing import IO +from textwrap import dedent +from models import Model +from util import SimulationConfig, adjust_unit, PlotType, SIMULATION_DATA_FILE + + +class MultiModel: + """ + Handles multiple simulation models, aggregates their data based on user-defined parameters, + and generates plots and statistics. + + Attributes: + window_size (int): The size of the window for data aggregation, which affects how data smoothing and granularity are handled. + models (list of Model): A list of Model instances that store the simulation data. + measure_unit (str): The unit of measurement for the simulation data, adjusted according to the user's specifications. + unit_scaling (int): The scaling factor applied to the unit of measurement. + max_model_len (int): The length of the shortest model's raw data, used for consistency in processing. + plot_path (str): The path where the generated plot will be saved. + analysis_file (IO): The file object for writing detailed analysis statistics. + COLOR_PALETTE (list of str): A list of color codes for plotting multiple models. + + Methods: + parse_user_input(window_size): Parses and sets the class attributes based on the provided user input. + adjust_unit(): Adjusts the unit of measurement based on user settings, applying appropriate metric prefixes. + set_paths(): Initializes the directory paths for storing outputs and analysis results. + init_models(): Reads simulation data from Parquet files and initializes Model instances. + compute_windowed_aggregation(): Processes the raw data by applying a windowed aggregation function for smoothing. + generate_plot(): Orchestrates the generation of the specified plot type by calling the respective plotting functions. + generate_time_series_plot(): Generates a time series plot of the aggregated data. + generate_cumulative_plot(): Creates a bar chart showing cumulative data for each model. + generate_cumulative_time_series_plot(): Produces a plot that displays cumulative data over time for each model. + save_plot(): Saves the generated plot to a PDF file in the specified directory. + output_stats(): Writes detailed statistics of the simulation to an analysis file for record-keeping. + mean_of_chunks(np_array, window_size): Calculates the mean of data segments for smoothing and processing. + get_cumulative_limits(model_sums): Determines appropriate x-axis limits for cumulative plots based on the model data. + + Usage: + To use this class, instantiate it with a dictionary of user settings, a path for outputs, and optionally a window size. + Call the `generate_plot` method to process the data and generate plots as configured by the user. + """ + + COLOR_PALETTE: list[str] = [ + # Colorblind-friendly palette + "#0072B2", "#E69F00", "#009E73", "#D55E00", "#CC79A7", "#F0E442", "#8B4513", + "#56B4E9", "#F0A3FF", "#FFB400", "#00BFFF", "#90EE90", "#FF6347", "#8A2BE2", "#CD5C5C", + "#4682B4", "#FFDEAD", "#32CD32", "#D3D3D3", "#999999" + ] + + def __init__(self, config: SimulationConfig, window_size: int = -1): + """ + Initializes the MultiModel with provided user settings and prepares the environment. + + :param user_input (dict): Configurations and settings from the user. + :param path (str): Path where output and analysis will be stored. + :param window_size (int): The size of the window to aggregate data; uses user input if -1. + :return: None + """ + + self.config: SimulationConfig = config + self.starting_time: float = time() + self.workload_time = None + self.timestamps = None + self.plot_path: str | None = None + + self.window_size = config.window_size if window_size == -1 else window_size + self.measure_unit: str + self.unit_scaling: int + self.measure_unit, self.unit_scaling = adjust_unit(config.current_unit, config.unit_scaling_magnitude) + + self.models: list[Model] = [] + self.max_model_len = 0 + + try: + os.makedirs(self.config.output_path, exist_ok=True) + self.analysis_file: IO = open(config.output_path + "/analysis.txt", "w") + except Exception as e: + print(f"Error handling output directory: {e}") + exit(1) + + self.analysis_file.write("Analysis file create\n") + + self.init_models() + if self.config.is_metamodel: + self.COLOR_PALETTE = ["#b3b3b3" for _ in range(len(self.models))] + if len(self.config.plot_colors) > 0: + self.COLOR_PALETTE = self.config.plot_colors + self.compute_windowed_aggregation() + + def get_model_path(self, dir: str) -> str: + return ( + f"{self.config.simulation_path}/" + f"{dir}/" + f"seed={self.config.seed}/" + f"{SIMULATION_DATA_FILE}.parquet" + ) + + def init_models(self): + """ + Initializes models from the simulation output stored in Parquet files. This method reads each Parquet file, + processes the relevant data, and initializes Model instances which are stored in the model list. + + :return: None + :raise ValueError: If the unit scaling has not been set prior to model initialization. + """ + if self.unit_scaling is None: + raise ValueError("Unit scaling factor is not set. Please ensure it is set correctly.") + + simulation_directories = os.listdir(self.config.simulation_path) + simulation_directories.sort() + + for sim_dir in simulation_directories: + print("Processing simulation: ", sim_dir) + if sim_dir == "metamodel": + continue + + simulation_id: str = os.path.basename(sim_dir) + columns_to_read = ['timestamp', self.config.metric] + parquet_file = pq.read_table(self.get_model_path(sim_dir), columns=columns_to_read).to_pandas() + + grouped_data = parquet_file.groupby('timestamp')[self.config.metric].sum() + # Apply unit scaling to the raw data + raw = np.divide(grouped_data.values, self.unit_scaling) + timestamps = parquet_file['timestamp'].unique() + + model = Model(raw_sim_data=raw, identifier=simulation_id) + self.models.append(model) + + if self.timestamps is None or len(self.timestamps) > len(timestamps): + self.timestamps = timestamps + + self.max_model_len = min([len(model.raw_sim_data) for model in self.models]) + + def compute_windowed_aggregation(self) -> None: + """ + Applies a windowed aggregation function to each model's dataset. This method is typically used for smoothing + or reducing data granularity. It involves segmenting the dataset into windows of specified size and applying + an aggregation function to each segment. + + :return: None + :side effect: Modifies each model's processed_sim_data attribute to contain aggregated data. + """ + if self.config.plot_type == PlotType.CUMULATIVE: + return + + for model in self.models: + numeric_values = model.raw_sim_data + model.processed_sim_data = self.mean_of_chunks(numeric_values, self.config.window_size) + + def generate_plot(self): + """ + Creates and saves plots based on the processed data from multiple models. This method determines + the type of plot to generate based on user input and invokes the appropriate plotting function. + + The plotting options supported are 'time_series', 'cumulative', and 'cumulative_time_series'. + Depending on the type specified, this method delegates to specific plot-generating functions. + + :return: None + :raises ValueError: If the plot type specified is not recognized or supported by the system. + :side effect: + - Generates and saves a plot to the file system. + - Updates the plot attributes based on the generated plot. + - Displays the plot on the matplotlib figure canvas. + """ + plt.figure(figsize=self.config.fig_size) + + plt.xticks(size=32) + plt.yticks(size=32) + plt.ylabel(self.config.y_axis.label, size=26) + plt.xlabel(self.config.x_axis.label, size=26) + plt.title(self.config.plot_title, size=26) + plt.grid() + + formatter = FuncFormatter(lambda x, _: '{:,}'.format(int(x)) if x >= 1000 else int(x)) + ax = plt.gca() + ax.xaxis.set_major_formatter(formatter) + + if self.config.x_axis.has_ticks(): + ax = plt.gca() + ax.xaxis.set_major_locator(MaxNLocator(self.config.x_axis.ticks)) + + if self.config.y_axis.has_ticks(): + ax = plt.gca() + ax.yaxis.set_major_locator(MaxNLocator(self.config.y_axis.ticks)) + + self.set_axis_limits() + + match self.config.plot_type: + case PlotType.TIME_SERIES: + self.generate_time_series_plot() + case PlotType.CUMULATIVE: + self.generate_cumulative_plot() + case PlotType.CUMULATIVE_TIME_SERIES: + self.generate_cumulative_time_series_plot() + + plt.tight_layout() + plt.subplots_adjust(right=0.85) + self.save_plot() + self.output_stats() + + def generate_time_series_plot(self): + """ + Plots time series data for each model. This function iterates over each model, applies the defined + windowing function to smooth the data, and plots the resulting series. + + :return: None + :side effect: Plots are displayed on the matplotlib figure canvas. + """ + + for i, model in enumerate(self.models): + label = "Meta-Model" if model.is_meta_model() else "Model " + str(model.id) + + if model.is_meta_model(): + repeated_means = np.repeat(model.processed_sim_data, self.window_size) + plt.plot(repeated_means, drawstyle='steps-mid', label=label, color="#228B22", linestyle="solid", + linewidth=2) + else: + means = self.mean_of_chunks(model.raw_sim_data, self.window_size) + repeated_means = np.repeat(means, self.window_size)[:len(model.raw_sim_data)] + plt.plot(repeated_means, drawstyle='steps-mid', label=label, color=self.COLOR_PALETTE[i]) + + def generate_cumulative_plot(self): + """ + Generates a horizontal bar chart showing cumulative data for each model. This function + aggregates total values per model and displays them in a bar chart, providing a visual + comparison of total values across models. + + :return: None + :side effect: Plots are displayed on the matplotlib figure canvas. + """ + plt.xlim(self.get_cumulative_limits(model_sums=self.sum_models_entries())) + plt.ylabel("Model ID", size=30) + plt.xlabel(self.config.x_axis.label, size=30) + + ax = plt.gca() + ax.tick_params(axis='x', which='major', length=12) # Set length of the ticks + ax.set_xticklabels([]) # Hide x-axis numbers + ax.xaxis.set_minor_locator(AutoMinorLocator(5)) # Set two minor ticks between majors + ax.tick_params(axis='x', which='minor', length=7, color='black') + plt.yticks(range(len(self.models)), [model.id for model in self.models]) + + plt.grid(False) + + cumulated_energies = self.sum_models_entries() + + for i, model in (enumerate(self.models)): + label = "Meta-Model" if model.is_meta_model() else "Model " + str(model.id) + if model.is_meta_model(): + plt.barh(i, cumulated_energies[i], label=label, color='#009E73', hatch='//') + plt.text(cumulated_energies[i], i, str(int(round(cumulated_energies[i], 0))), ha='left', va='center', + size=26) + else: + round_decimals = 0 if cumulated_energies[i] > 500 else 1 + plt.barh(label=label, y=i, width=cumulated_energies[i], color=self.COLOR_PALETTE[i]) + plt.text(cumulated_energies[i], i, str(int(round(cumulated_energies[i], round_decimals))), ha='left', + va='center', size=26) + + def generate_cumulative_time_series_plot(self): + """ + Generates a plot showing the cumulative data over time for each model. This visual representation is + useful for analyzing trends and the accumulation of values over time. + + :return: None + :side effect: Displays the cumulative data over time on the matplotlib figure canvas. + """ + self.compute_cumulative_time_series() + + for i, model in enumerate(self.models): + label = "Meta-Model" if model.is_meta_model() else "Model " + str(model.id) + if model.is_meta_model(): + cumulative_repeated = np.repeat(model.cumulative_time_series_values, self.window_size)[ + :len(model.processed_sim_data) * self.window_size] + plt.plot(cumulative_repeated, label=label, drawstyle='steps-mid', color="#228B22", linestyle="solid", + linewidth=2) + else: + cumulative_repeated = np.repeat(model.cumulative_time_series_values, self.window_size)[ + :len(model.raw_sim_data)] + plt.plot(cumulative_repeated, drawstyle='steps-mid', label=("Model " + str(model.id)), + color=self.COLOR_PALETTE[i]) + + def compute_cumulative_time_series(self): + """ + Computes the cumulative sum of processed data over time for each model, storing the result for use in plotting. + + :return: None + :side effect: Updates each model's 'cumulative_time_series_values' attribute with the cumulative sums. + """ + for model in self.models: + cumulative_array = [] + _sum = 0 + for value in model.processed_sim_data: + _sum += value + cumulative_array.append(_sum * self.window_size) + model.cumulative_time_series_values = cumulative_array + + def save_plot(self): + """ + Saves the current plot to a PDF file in the specified directory, constructing the file path from the + plot attributes and ensuring that the directory exists before saving. + + :return: None + :side effect: Creates or overwrites a PDF file containing the plot in the designated folder. + """ + output_dir = f"{self.config.output_path}/simulation-analysis/{self.config.metric}" + try: + os.makedirs(output_dir, exist_ok=True) + except OSError as e: + print(f"Error handling output directory: {e}") + exit(1) + + self.plot_path: str = ( + f"{output_dir}/" + f"{self.config.plot_type}" + f"_plot_multimodel_metric={self.config.metric}" + f"_window={self.window_size}" + f".pdf" + ) if self.config.figure_export_name is None \ + else f"{output_dir}/{self.config.figure_export_name}.pdf" + + plt.savefig(self.plot_path) + + def set_axis_limits(self) -> None: + """ + Sets the x-axis and y-axis limits for the current plot based on the user-defined configuration. + This method ensures that the plot displays the data within the specified range, enhancing readability. + """ + if self.config.x_axis.has_range(): + plt.xlim(left=self.config.x_axis.value_range[0], right=self.config.x_axis.value_range[1]) + + if self.config.y_axis.has_range(): + plt.ylim(bottom=self.config.y_axis.value_range[0], top=self.config.y_axis.value_range[1]) + + def sum_models_entries(self): + """ + Computes the total values from each model for use in cumulative plotting. This method aggregates + the data across all models and prepares it for cumulative display. + + :return: List of summed values for each model, useful for plotting and analysis. + """ + models_sums = [] + for i, model in enumerate(self.models): + if model.is_meta_model(): + models_sums.append(model.cumulated) + else: + cumulated_energy = model.raw_sim_data.sum() + cumulated_energy = round(cumulated_energy, 2) + models_sums.append(cumulated_energy) + + return models_sums + + def output_stats(self) -> None: + """ + Records and writes detailed simulation statistics to an analysis file. This includes time stamps, + performance metrics, and other relevant details. + + :return: None + :side effect: Appends detailed simulation statistics to an existing file for record-keeping and analysis. + """ + end_time: float = time() + self.analysis_file.write(dedent( + f""" + ========================================================= + Simulation made at {strftime("%Y-%m-%d %H:%M:%S")} + Metric: {self.config.metric} + Unit: {self.measure_unit} + Window size: {self.window_size} + Sample count in raw sim data: {self.max_model_len} + Computing time {round(end_time - self.starting_time, 1)}s + Plot path: {self.plot_path} + ========================================================= + """ + )) + + def mean_of_chunks(self, np_array: np.array, window_size: int) -> np.array: + """ + Calculates the mean of data within each chunk for a given array. This method helps in smoothing the data by + averaging over specified 'window_size' segments. + + :param np_array: Array of numerical data to be chunked and averaged. + :param window_size: The size of each segment to average over. + :return: np.array: An array of mean values for each chunk. + """ + if window_size == 1: + return np_array + + chunks: list[np.array] = [np_array[i:i + window_size] for i in range(0, len(np_array), window_size)] + means: list[float] = [np.mean(chunk) for chunk in chunks] + return np.array(means) + + def get_cumulative_limits(self, model_sums: list[float]) -> list[float]: + """ + Calculates the appropriate x-axis limits for cumulative plots based on the summarized data from each model. + + :param model_sums: List of summed values for each model. + :return: list[float]: A list containing the minimum and maximum values for the x-axis limits. + """ + axis_min = min(model_sums) * 0.9 + axis_max = max(model_sums) * 1.1 + + if self.config.x_axis.value_range is not None: + axis_min = self.config.x_axis.value_range[0] + axis_max = self.config.x_axis.value_range[1] + + return [axis_min * 0.9, axis_max * 1.1] diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/requirements.txt b/opendc-experiments/opendc-experiments-m3sa/src/main/python/requirements.txt index cbd22985..ee1189f8 100644 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/python/requirements.txt +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/requirements.txt @@ -1,4 +1,5 @@ -matplotlib==3.8.4 -numpy==2.1.1 -pandas==2.2.2 -pyarrow==16.1.0 +matplotlib >= 3.10.0 +numpy >= 2.2.0 +pandas >= 2.2.3 +pyarrow >= 18.1.0 +tqdm diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/simulator_specifics.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/simulator_specifics.py deleted file mode 100644 index 4e1c36e1..00000000 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/python/simulator_specifics.py +++ /dev/null @@ -1,14 +0,0 @@ -""" -This file is the integration layer of the M3SA tool upon any (ICT) simulator. - -The system will use the elements from this file in the analysis / meta-simulation process. -""" - -""" -SIMULATION_DATA_FILE (str): The name of the file containing the simulation data. Enter only the name, not the path, not -the extension. The data file must be parquet format. - -✅ Good: "host", "simulation_data", "cats_predictions" -❌ Wrong: "host.json", "opendc/folder_x/folder_y/data" -""" -SIMULATION_DATA_FILE = "host" # opendc outputs in file host.parquet diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/__init__.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/__init__.py new file mode 100644 index 00000000..120c2f56 --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/__init__.py @@ -0,0 +1,2 @@ +from .config import parse_configuration, SimulationConfig, PlotAxis, PlotType +from .util import * diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/accuracy_evaluator.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/accuracy_evaluator.py index 463f69e6..0fae4898 100644 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/python/accuracy_evaluator.py +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/accuracy_evaluator.py @@ -1,6 +1,6 @@ import numpy as np -from models.MetaModel import MetaModel +from models.meta_model import MetaModel def accuracy_evaluator( @@ -26,7 +26,7 @@ def accuracy_evaluator( :return: None, but prints the accuracy metrics """ - meta_model = MetaModel(multimodel=multi_model) + meta_model = MetaModel(multi_model=multi_model) multi_model.models.append(meta_model.meta_model) # metamodel # multi_model.models.append(Model(raw_host_data=real_data, id=-1, path=None)) # real-world data @@ -35,12 +35,12 @@ def accuracy_evaluator( f.write("Accuracy Report, against ground truth\n") for model in multi_model.models: - if only_metamodel and model.id != 101: + if only_metamodel and model.id != -101: continue if model.id == -1: f.write("Real-World data") - elif model.id == 101: + elif model.id == -101: f.write( f"Meta-Model, meta-function: {multi_model.user_input['meta_function']}, window_size: {meta_model.multi_model.window_size}") else: @@ -55,7 +55,7 @@ def accuracy_evaluator( real_data=real_data, simulation_data=simulation_data ) - f.write(f"\nMean Absolute Percentage Error (MAPE): {accuracy_mape}%") + f.write(f"| Mean Absolute Percentage Error (MAPE): {accuracy_mape}%\n") if compute_nad: accuracy_nad = nad( diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/config.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/config.py new file mode 100644 index 00000000..e0d9827b --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/config.py @@ -0,0 +1,186 @@ +from json import JSONDecodeError, load +from warnings import warn +from numpy import mean, median +from typing import Callable +from enum import Enum +from sys import stderr +import os + +FUNCTIONS = { + "mean": mean, + "median": median, +} + + +class PlotType(Enum): + TIME_SERIES = "time_series" + CUMULATIVE = "cumulative" + CUMULATIVE_TIME_SERIES = "cumulative_time_series" + + def __str__(self) -> str: + return self.value + + +def get_plot_type(plot_type: str) -> PlotType: + """ + Returns the PlotType enum value for the given string + Args: + plot_type: the string representation of the plot type + Returns: + the PlotType enum value + """ + return next((pt for pt in PlotType if pt.value == plot_type), PlotType.TIME_SERIES) + + +class PlotAxis: + """ + This class represents an axis of a plot. It contains the label, value range, and number of ticks for the axis. + Attributes: + label (str): the label of the axis + value_range (tuple[float, float]): the range of values for the axis + ticks (int): the number of ticks on the axis + """ + + def __init__(self, label: str, value_range: tuple[float, float] | None, ticks: int | None): + self.label = label + self.value_range = value_range + self.ticks = ticks + + def has_range(self) -> bool: + """ + Checks if the axis has a value range + Returns: + True if the axis has a value range, False otherwise + """ + return self.value_range is not None + + def has_ticks(self) -> bool: + """ + Checks if the axis has a number of ticks + Returns: + True if the axis has a number of ticks, False otherwise + """ + return self.ticks is not None + + +class SimulationConfig: + """ + This class represents the configuration of a simulation. + It contains all the necessary parameters to run a simulation using multiple models. + + Attributes: + is_multimodel (bool): whether the simulation is multimodel + is_metamodel (bool): whether the simulation is a metamodel + metric (str): the metric to be used + window_function (function): the window function to be used + meta_function (function): the meta function to be used + window_size (int): the window size + samples_per_minute (int): the number of samples per minute + current_unit (str): the current unit + unit_scaling_magnitude (int): the unit scaling magnitude + plot_type (str): the plot type + plot_title (str): the plot title + x_axis (PlotAxis): the x-axis + y_axis (PlotAxis): the y-axis + seed (int): the seed + fig_size (tuple[int, int]): the figure size + """ + + def __init__(self, input_json: dict[str, any], output_path: str, simulation_path: str): + """ + Initializes the SimulationConfig object with the given input JSON + Args: + input_json: the input JSON object + Raises: + ValueError: if the input JSON is missing required + fields or has invalid values for certain fields + """ + + if "metric" not in input_json: + raise ValueError("Required field 'metric' is missing.") + if "meta_function" not in input_json and input_json["metamodel"]: + raise ValueError( + "Required field 'meta_function' is missing. Please select between 'mean' and 'median'. " + "Alternatively, disable metamodel in the config file." + ) + if input_json["meta_function"] not in FUNCTIONS: + raise ValueError( + "Invalid value for meta_function. Please select between 'mean' and 'median'." + ) + if "multimodel" not in input_json and input_json["metamodel"]: + warn("Warning: Missing 'multimodel' field. Defaulting to 'True'.") + + self.output_path: str = output_path + self.simulation_path: str = simulation_path + self.is_multimodel: bool = input_json.get("multimodel", True) + self.is_metamodel: bool = input_json.get("metamodel", False) + self.metric: str = input_json["metric"] + self.window_function: Callable[[any], float] = FUNCTIONS[input_json.get("window_function", "mean")] + self.meta_function: Callable[[any], float] = FUNCTIONS[input_json.get("meta_function", "mean")] + self.window_size: int = input_json.get("window_size", 1) + self.samples_per_minute: int = input_json.get("samples_per_minute", 0) + self.current_unit: str = input_json.get("current_unit", "") + self.unit_scaling_magnitude: int = input_json.get("unit_scaling_magnitude", 1) + self.plot_type: PlotType = next( + (pt for pt in PlotType if pt.value == input_json.get("plot_type", "time_series")), PlotType.TIME_SERIES) + self.plot_title: str = input_json.get("plot_title", "") + self.x_axis: PlotAxis = PlotAxis( + input_json.get("x_label", ""), + parse_range(input_json, "x"), + input_json.get("x_ticks_count", None) + ) + self.y_axis: PlotAxis = PlotAxis( + input_json.get("y_label", ""), + parse_range(input_json, "y"), + input_json.get("y_ticks_count", None) + ) + self.seed: int = input_json.get("seed", 0) + self.fig_size: tuple[int, int] = input_json.get("figsize", (20, 10)) + self.plot_colors: list[str] = input_json.get("plot_colors", []) + self.figure_export_name: str | None = input_json.get("figure_export_name", None) + + +def parse_range(user_input: dict[str, any], key: str) -> tuple[float, float] | None: + """ + Parses a range from the user input + Args: + user_input: the user input dictionary + key: the key of the range + + Returns: + a tuple containing the minimum and maximum values of the range + """ + + if f"{key}_min" not in user_input or f"{key}_max" not in user_input: + return None + + return user_input[f"{key}_min"], user_input[f"{key}_max"] + + +def parse_configuration(config_path: str, output_path: str, simulation_path: str) -> SimulationConfig: + """ + Reads the input JSON file and returns a SimulationConfig object + Args: + config_path: the path to the input JSON file + output_path: the path to the output folder + simulation_path: the path to the simulation folder + + Returns: + a SimulationConfig object + """ + + try: + with (open(config_path, 'r') as json): + input_json: dict[str, any] = load(json) + except JSONDecodeError: + stderr.write(f"Error decoding JSON in file: {config_path}") + exit(1) + except IOError: + stderr.write(f"Error reading file: {config_path}") + exit(1) + + try: + return SimulationConfig(input_json, output_path, simulation_path) + except ValueError as err: + print(f"Error parsing input JSON: {err}") + exit(1) diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/util.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/util.py new file mode 100644 index 00000000..067af53f --- /dev/null +++ b/opendc-experiments/opendc-experiments-m3sa/src/main/python/util/util.py @@ -0,0 +1,76 @@ +from json import JSONDecodeError, load + +UNIT_FACTORS: dict[int, str] = { + -9: 'n', + -6: 'μ', + -3: 'm', + 0: '', + 1: 'k', + 3: 'M', + 6: 'G', + 9: 'T' +} + +SIMULATION_ANALYSIS_FOLDER_NAME = 'simulation-analysis' +EMISSIONS_ANALYSIS_FOLDER_NAME = 'carbon_emission' +ENERGY_ANALYSIS_FOLDER_NAME = 'power_draw' + +""" +SIMULATION_DATA_FILE (str): The name of the file containing the simulation data. Enter only the name, not the path, not +the extension. The data file must be parquet format. + +✅ Good: "host", "simulation_data", "cats_predictions" +❌ Wrong: "host.json", "opendc/folder_x/folder_y/data" +""" +SIMULATION_DATA_FILE = "host" # opendc outputs in file host.parquet + + +def adjust_unit(target_unit: str, magnitude: int) -> tuple[str, int]: + """ + Adjusts the unit based on the magnitude provided. + Example: + adjust_unit('W', 3) -> ('kW', 1000) + Args: + target_unit: The target unit to adjust. + magnitude: The magnitude to adjust the unit by. + + Returns: + A tuple containing the adjusted unit and magnitude. + """ + + result_unit = UNIT_FACTORS.get(magnitude, '') + target_unit + result_magnitude = (10 ** magnitude) if magnitude in UNIT_FACTORS else 1 + return result_unit, result_magnitude + + +def clean_analysis_file(metric: str) -> None: + analysis_file_path = SIMULATION_ANALYSIS_FOLDER_NAME + "/" + if metric == "power_draw": + analysis_file_path += ENERGY_ANALYSIS_FOLDER_NAME + else: + analysis_file_path += EMISSIONS_ANALYSIS_FOLDER_NAME + analysis_file_path += "/analysis.txt" + + with open(analysis_file_path, "w") as f: + f.write("") + + +def parse_json(json_path: str) -> dict[str, any]: + """ + Parses a JSON file and returns the dictionary representation. + Args: + json_path: The path to the JSON file. + + Returns: + A dictionary containing the JSON data. + """ + + try: + with open(json_path, 'r') as raw_json: + return load(raw_json) + except JSONDecodeError: + print(f"Error decoding JSON in file: {json_path}") + exit(1) + except IOError: + print(f"Error reading file: {json_path}") + exit(1) diff --git a/opendc-experiments/opendc-experiments-m3sa/src/main/python/utils.py b/opendc-experiments/opendc-experiments-m3sa/src/main/python/utils.py deleted file mode 100644 index fd4fec2e..00000000 --- a/opendc-experiments/opendc-experiments-m3sa/src/main/python/utils.py +++ /dev/null @@ -1,25 +0,0 @@ -import sys - -""" -Constants for the main.py file -""" - -SIMULATION_ANALYSIS_FOLDER_NAME = 'simulation-analysis' -EMISSIONS_ANALYSIS_FOLDER_NAME = 'carbon_emission' -ENERGY_ANALYSIS_FOLDER_NAME = 'power_draw' - -""" -Utility functions -""" - - -def clean_analysis_file(metric): - analysis_file_path = SIMULATION_ANALYSIS_FOLDER_NAME + "/" - if metric == "power_draw": - analysis_file_path += ENERGY_ANALYSIS_FOLDER_NAME - else: - analysis_file_path += EMISSIONS_ANALYSIS_FOLDER_NAME - analysis_file_path += "/analysis.txt" - - with open(analysis_file_path, "w") as f: - f.write("") |
