Untitled
unknown
plain_text
a year ago
11 kB
4
Indexable
import os import xarray as xr import numpy as np from datetime import datetime import logging from src.computation.statistics_computation import read_mfnetcdf, calculate_stats, compute_to_netcdf, \ calculate_counts_xhistogram, read_netcdf, calculate_probabilities from src.errors.error import NoFilesReady, MissingVariables from src.interfaces.execution_process import ExecutionProcess from src.config.config import get_config_data from src.utilities.file_parser import build_path_string, get_file_paths, filter_files config = get_config_data() logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class Computation(ExecutionProcess): def __init__(self, model_name, **kwargs): super().__init__(**kwargs) self.model_name = model_name self.source_dir = config["computation"]["source_dir"] self.output_dir = config["computation"]["output_dir"] self.date = str(datetime.now().strftime('%Y%m%d')) if config["general"]["date"] == "latest" else \ config["general"]["date"] self.variables = ["water_speed", "water_u", "water_v", "water_temp", "salinity", "sound_speed", "density"] self.analysis_methods = ["avg", "min", "max", "std", "sum", "ssx"] self.aparms_file_names = ["conjugate_depth", "deep_sound_channel", "multi_criteria_analysis", "secondary_sound_channel", "surface_duct"] self.aparms_var_data = { "acousticConjugateDepth1DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]), "acousticConjugateDepth2DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]), "acousticConjugateDepth3DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]), "acousticConjugateDepth4DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]), "acousticConjugateDepth5DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]), "acousticConjugateDepth6DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]), "acousticBottomAngle": np.array([-np.inf, 3, 5, 9, 13, 17, 21, 25, np.inf]), "acousticDeepSoundChannelAxisDepth": np.array([-np.inf, 1, 250, 500, 1000, 1500, 2000, 3000, 4000, np.inf]), "acousticDeepSoundChannelDepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]), "acousticVerticalAngle": np.array([-np.inf, 3, 5, 9, 13, 17, 21, 25, np.inf]), "acousticSecondarySoundChannelOneAxisDepth": np.array([0, 1, 100, 200, 300, 400, 500, 750, 1000, np.inf]), "acousticSecondarySoundChannelTwoAxisDepth": np.array([0, 1, 100, 200, 300, 400, 500, 750, 1000, np.inf]), "acousticSecondarySoundChannelOneCutoffFrequency": np.array( [-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]), "acousticSecondarySoundChannelTwoCutoffFrequency": np.array( [-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]), "acousticSurfaceDuctCutoffFrequency": np.array( [-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]), "acousticComplexHalfChannelCutoffFrequency": np.array( [-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]), "acousticSurfaceDuctSonicLayerDepth": np.array( [-np.inf, 25, 50, 100, 150, 200, 250, 300, 350, 400, 450, np.inf]), "acousticSurfaceDuctSoundSpeedBelowLayerGradient": np.array( [-np.inf, 5, 10, 15, 20, 25, 30, 35, 40, np.inf]), "acousticConjugateDepth1Mask": np.array([-1.0, 0, np.inf]), "acousticConjugateDepth2Mask": np.array([-1.0, 0, np.inf]), "acousticConjugateDepth3Mask": np.array([-1.0, 0, np.inf]), "acousticConjugateDepth4Mask": np.array([-1.0, 0, np.inf]), "acousticConjugateDepth5Mask": np.array([-1.0, 0, np.inf]), "acousticConjugateDepth6Mask": np.array([-1.0, 0, np.inf]), "acousticDeepSoundChannelMask": np.array([-1.0, 0, np.inf]), "acousticSecondarySoundChannelOneMask": np.array([-1.0, 0, np.inf]), "acousticSecondarySoundChannelTwoMask": np.array([-1.0, 0, np.inf]), "acousticSurfaceDuctMask": np.array([-1.0, 0, np.inf]), "acousticComplexHalfChannelMask": np.array([-1.0, 0, np.inf]) } self.counts_encoding = {"dtype": "int32", "missing_value": -30000, "_FillValue": -30000, "zlib": True} def run(self): logger.info("Starting computation process...") # Initialize base directories year_month = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m") year_month_day = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m%d") path_to_source = build_path_string(self.source_dir, self.model_name, year_month) files_in_path = get_file_paths(path_to_source, f"*{year_month_day}*.nc") logger.info(f"Found {len(files_in_path)} files in source directory: {files_in_path}") # Set up filter to only gather files related to the analysis filtered_files_in_path = filter_files(files_in_path, self.variables, self.model_name) logger.info(f"Filtered files to {len(filtered_files_in_path)} files: {filtered_files_in_path}") if not filtered_files_in_path: logger.error(f"No files found after filtering for variables: {self.variables}") return # Change the output path to store the calculated files calculated_output_path = build_path_string(self.output_dir, self.model_name, year_month) logger.info(f"Calculated output path: {calculated_output_path}") # Read in the data logger.info(f"Reading data from files: {filtered_files_in_path}") data = read_mfnetcdf(filtered_files_in_path, chunks={"time": "auto"}, drop_vars=["tau"]) logger.info(f"Variables in the dataset: {list(data.variables)}") missing_vars = [var for var in self.variables if var not in data.variables] if missing_vars: logger.error(f"Missing the following variables to compute: {missing_vars}") raise MissingVariables(f"Missing the following variables to compute: {missing_vars}") # Compute statistics for variable in self.variables: for analysis_method in self.analysis_methods: logger.info(f"Computing {analysis_method} for {variable}") stat_data = calculate_stats(data, var_compute=variable, analysis_method=analysis_method) compute_to_netcdf(stat_data, file_name=f"{self.model_name}-{year_month_day}", output_dir=calculated_output_path, mode="a") logger.info(f"Computed {analysis_method} for {variable}") if "water_speed" in data.variables: logger.info("Computing counts and probabilities for water_speed") cnt_data = calculate_counts_xhistogram(data, "water_speed", "spd_cnts", "spd_bin", exclude_last_bin=True) cnts_out_path = compute_to_netcdf(cnt_data, "spd_cnts", output_dir=f"{path_to_source}/temp", file_name="output_cnts") cnt_data = read_netcdf(cnts_out_path) cnt_data = cnt_data.where(~(cnt_data.sum(dim=["spd_bin"]) == 0), other=np.nan) prob_cnt = calculate_probabilities(cnt_data, "spd_cnts", "spd_bin") compute_to_netcdf(prob_cnt, file_name=f"{self.model_name}-{year_month_day}", output_dir=calculated_output_path, mode="a", encoding={"spd_cnts": self.counts_encoding}) os.remove(cnts_out_path) logger.info("Computed counts and probabilities for water_speed") logger.info("Starting APARMS file computations") aparms_files = filter_files(files_in_path, self.aparms_file_names) logger.info(f"Filtered APARMS files to {len(aparms_files)} files: {aparms_files}") if aparms_files: aparms_data = read_mfnetcdf(aparms_files, chunks={"time": "auto"}, decode_times=False) times, datasets = zip(*aparms_data.groupby("time")) for aparm_var, bins in self.aparms_var_data.items(): logger.info(f"Computing {aparm_var} with bins: {bins}") compute_sets = [calculate_counts_xhistogram(d_set, aparm_var, f"{aparm_var}Cnts", bins=bins) for d_set in datasets] paths = [f"{calculated_output_path}/aparms_{year_month_day}_t{int(t):04d}.nc" for t in times] xr.save_mfdataset(compute_sets, paths, mode="a", encoding={f"{aparm_var}Cnts": self.counts_encoding}) logger.info(f"Computed {aparm_var} with {len(times)} files.") logger.info("Computation process completed successfully.") def check_if_can_run(self): year_month = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m") year_month_day = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m%d") path_to_source = build_path_string(self.source_dir, self.model_name, year_month) files_in_path = get_file_paths(path_to_source, f"*{year_month_day}*.nc") logger.info(f"Checking if computation can run with files: {files_in_path}") if not files_in_path: logger.error(f"No files found in path: {path_to_source}") raise NoFilesReady(f"No files found for {self.model_name} on {self.date}") filtered_files_in_path = filter_files(files_in_path, self.variables, self.model_name) logger.info(f"Filtered files for required variables: {filtered_files_in_path}") if not filtered_files_in_path: logger.error(f"No files found for variables: {self.variables}") return False data = read_mfnetcdf(filtered_files_in_path) logger.info(f"Variables in the dataset: {list(data.variables)}") # Verify that the variables to compute are existing in the dataset existing_vars = [y for y in data.data_vars if y in self.variables] if len(existing_vars) != len(self.variables): missing_vars = [y for y in self.variables if y not in data.data_vars] error = f"Missing the following variables to compute: {missing_vars}" logger.error(error) raise MissingVariables(error) logger.info("All necessary variables are present.") return True
Editor is loading...
Leave a Comment