Untitled

mail@pastecode.io avatar
unknown
plain_text
a month ago
11 kB
1
Indexable
Never
import os

import xarray as xr
import numpy as np
from datetime import datetime
import logging

from src.computation.statistics_computation import read_mfnetcdf, calculate_stats, compute_to_netcdf, \
    calculate_counts_xhistogram, read_netcdf, calculate_probabilities
from src.errors.error import NoFilesReady, MissingVariables
from src.interfaces.execution_process import ExecutionProcess
from src.config.config import get_config_data
from src.utilities.file_parser import build_path_string, get_file_paths, filter_files

config = get_config_data()

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


class Computation(ExecutionProcess):

    def __init__(self, model_name, **kwargs):
        super().__init__(**kwargs)
        self.model_name = model_name
        self.source_dir = config["computation"]["source_dir"]
        self.output_dir = config["computation"]["output_dir"]
        self.date = str(datetime.now().strftime('%Y%m%d')) if config["general"]["date"] == "latest" else \
        config["general"]["date"]
        self.variables = ["water_speed", "water_u", "water_v", "water_temp", "salinity", "sound_speed", "density"]
        self.analysis_methods = ["avg", "min", "max", "std", "sum", "ssx"]
        self.aparms_file_names = ["conjugate_depth", "deep_sound_channel", "multi_criteria_analysis",
                                  "secondary_sound_channel", "surface_duct"]

        self.aparms_var_data = {
            "acousticConjugateDepth1DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
            "acousticConjugateDepth2DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
            "acousticConjugateDepth3DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
            "acousticConjugateDepth4DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
            "acousticConjugateDepth5DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
            "acousticConjugateDepth6DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
            "acousticBottomAngle": np.array([-np.inf, 3, 5, 9, 13, 17, 21, 25, np.inf]),
            "acousticDeepSoundChannelAxisDepth": np.array([-np.inf, 1, 250, 500, 1000, 1500, 2000, 3000, 4000, np.inf]),
            "acousticDeepSoundChannelDepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
            "acousticVerticalAngle": np.array([-np.inf, 3, 5, 9, 13, 17, 21, 25, np.inf]),
            "acousticSecondarySoundChannelOneAxisDepth": np.array([0, 1, 100, 200, 300, 400, 500, 750, 1000, np.inf]),
            "acousticSecondarySoundChannelTwoAxisDepth": np.array([0, 1, 100, 200, 300, 400, 500, 750, 1000, np.inf]),
            "acousticSecondarySoundChannelOneCutoffFrequency": np.array(
                [-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]),
            "acousticSecondarySoundChannelTwoCutoffFrequency": np.array(
                [-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]),
            "acousticSurfaceDuctCutoffFrequency": np.array(
                [-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]),
            "acousticComplexHalfChannelCutoffFrequency": np.array(
                [-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]),
            "acousticSurfaceDuctSonicLayerDepth": np.array(
                [-np.inf, 25, 50, 100, 150, 200, 250, 300, 350, 400, 450, np.inf]),
            "acousticSurfaceDuctSoundSpeedBelowLayerGradient": np.array(
                [-np.inf, 5, 10, 15, 20, 25, 30, 35, 40, np.inf]),
            "acousticConjugateDepth1Mask": np.array([-1.0, 0, np.inf]),
            "acousticConjugateDepth2Mask": np.array([-1.0, 0, np.inf]),
            "acousticConjugateDepth3Mask": np.array([-1.0, 0, np.inf]),
            "acousticConjugateDepth4Mask": np.array([-1.0, 0, np.inf]),
            "acousticConjugateDepth5Mask": np.array([-1.0, 0, np.inf]),
            "acousticConjugateDepth6Mask": np.array([-1.0, 0, np.inf]),
            "acousticDeepSoundChannelMask": np.array([-1.0, 0, np.inf]),
            "acousticSecondarySoundChannelOneMask": np.array([-1.0, 0, np.inf]),
            "acousticSecondarySoundChannelTwoMask": np.array([-1.0, 0, np.inf]),
            "acousticSurfaceDuctMask": np.array([-1.0, 0, np.inf]),
            "acousticComplexHalfChannelMask": np.array([-1.0, 0, np.inf])
        }

        self.counts_encoding = {"dtype": "int32", "missing_value": -30000, "_FillValue": -30000, "zlib": True}

    def run(self):
        logger.info("Starting computation process...")
        # Initialize base directories
        year_month = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m")
        year_month_day = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m%d")
        path_to_source = build_path_string(self.source_dir, self.model_name, year_month)
        files_in_path = get_file_paths(path_to_source, f"*{year_month_day}*.nc")

        logger.info(f"Found {len(files_in_path)} files in source directory: {files_in_path}")

        # Set up filter to only gather files related to the analysis
        filtered_files_in_path = filter_files(files_in_path, self.variables, self.model_name)
        logger.info(f"Filtered files to {len(filtered_files_in_path)} files: {filtered_files_in_path}")

        if not filtered_files_in_path:
            logger.error(f"No files found after filtering for variables: {self.variables}")
            return

        # Change the output path to store the calculated files
        calculated_output_path = build_path_string(self.output_dir, self.model_name, year_month)
        logger.info(f"Calculated output path: {calculated_output_path}")

        # Read in the data
        logger.info(f"Reading data from files: {filtered_files_in_path}")
        data = read_mfnetcdf(filtered_files_in_path, chunks={"time": "auto"}, drop_vars=["tau"])
        logger.info(f"Variables in the dataset: {list(data.variables)}")

        missing_vars = [var for var in self.variables if var not in data.variables]
        if missing_vars:
            logger.error(f"Missing the following variables to compute: {missing_vars}")
            raise MissingVariables(f"Missing the following variables to compute: {missing_vars}")

        # Compute statistics
        for variable in self.variables:
            for analysis_method in self.analysis_methods:
                logger.info(f"Computing {analysis_method} for {variable}")
                stat_data = calculate_stats(data, var_compute=variable, analysis_method=analysis_method)
                compute_to_netcdf(stat_data, file_name=f"{self.model_name}-{year_month_day}",
                                  output_dir=calculated_output_path, mode="a")
                logger.info(f"Computed {analysis_method} for {variable}")

        if "water_speed" in data.variables:
            logger.info("Computing counts and probabilities for water_speed")
            cnt_data = calculate_counts_xhistogram(data, "water_speed", "spd_cnts", "spd_bin", exclude_last_bin=True)
            cnts_out_path = compute_to_netcdf(cnt_data, "spd_cnts", output_dir=f"{path_to_source}/temp",
                                              file_name="output_cnts")
            cnt_data = read_netcdf(cnts_out_path)
            cnt_data = cnt_data.where(~(cnt_data.sum(dim=["spd_bin"]) == 0), other=np.nan)
            prob_cnt = calculate_probabilities(cnt_data, "spd_cnts", "spd_bin")
            compute_to_netcdf(prob_cnt, file_name=f"{self.model_name}-{year_month_day}",
                              output_dir=calculated_output_path, mode="a", encoding={"spd_cnts": self.counts_encoding})
            os.remove(cnts_out_path)
            logger.info("Computed counts and probabilities for water_speed")

        logger.info("Starting APARMS file computations")
        aparms_files = filter_files(files_in_path, self.aparms_file_names)
        logger.info(f"Filtered APARMS files to {len(aparms_files)} files: {aparms_files}")

        if aparms_files:
            aparms_data = read_mfnetcdf(aparms_files, chunks={"time": "auto"}, decode_times=False)
            times, datasets = zip(*aparms_data.groupby("time"))
            for aparm_var, bins in self.aparms_var_data.items():
                logger.info(f"Computing {aparm_var} with bins: {bins}")
                compute_sets = [calculate_counts_xhistogram(d_set, aparm_var, f"{aparm_var}Cnts", bins=bins) for d_set
                                in datasets]
                paths = [f"{calculated_output_path}/aparms_{year_month_day}_t{int(t):04d}.nc" for t in times]
                xr.save_mfdataset(compute_sets, paths, mode="a", encoding={f"{aparm_var}Cnts": self.counts_encoding})
                logger.info(f"Computed {aparm_var} with {len(times)} files.")

        logger.info("Computation process completed successfully.")

    def check_if_can_run(self):
        year_month = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m")
        year_month_day = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m%d")
        path_to_source = build_path_string(self.source_dir, self.model_name, year_month)
        files_in_path = get_file_paths(path_to_source, f"*{year_month_day}*.nc")

        logger.info(f"Checking if computation can run with files: {files_in_path}")

        if not files_in_path:
            logger.error(f"No files found in path: {path_to_source}")
            raise NoFilesReady(f"No files found for {self.model_name} on {self.date}")

        filtered_files_in_path = filter_files(files_in_path, self.variables, self.model_name)
        logger.info(f"Filtered files for required variables: {filtered_files_in_path}")

        if not filtered_files_in_path:
            logger.error(f"No files found for variables: {self.variables}")
            return False

        data = read_mfnetcdf(filtered_files_in_path)
        logger.info(f"Variables in the dataset: {list(data.variables)}")

        # Verify that the variables to compute are existing in the dataset
        existing_vars = [y for y in data.data_vars if y in self.variables]

        if len(existing_vars) != len(self.variables):
            missing_vars = [y for y in self.variables if y not in data.data_vars]
            error = f"Missing the following variables to compute: {missing_vars}"
            logger.error(error)
            raise MissingVariables(error)

        logger.info("All necessary variables are present.")
        return True
Leave a Comment