Untitled
unknown
plain_text
a year ago
11 kB
7
Indexable
import os
import xarray as xr
import numpy as np
from datetime import datetime
import logging
from src.computation.statistics_computation import read_mfnetcdf, calculate_stats, compute_to_netcdf, \
calculate_counts_xhistogram, read_netcdf, calculate_probabilities
from src.errors.error import NoFilesReady, MissingVariables
from src.interfaces.execution_process import ExecutionProcess
from src.config.config import get_config_data
from src.utilities.file_parser import build_path_string, get_file_paths, filter_files
config = get_config_data()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class Computation(ExecutionProcess):
def __init__(self, model_name, **kwargs):
super().__init__(**kwargs)
self.model_name = model_name
self.source_dir = config["computation"]["source_dir"]
self.output_dir = config["computation"]["output_dir"]
self.date = str(datetime.now().strftime('%Y%m%d')) if config["general"]["date"] == "latest" else \
config["general"]["date"]
self.variables = ["water_speed", "water_u", "water_v", "water_temp", "salinity", "sound_speed", "density"]
self.analysis_methods = ["avg", "min", "max", "std", "sum", "ssx"]
self.aparms_file_names = ["conjugate_depth", "deep_sound_channel", "multi_criteria_analysis",
"secondary_sound_channel", "surface_duct"]
self.aparms_var_data = {
"acousticConjugateDepth1DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
"acousticConjugateDepth2DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
"acousticConjugateDepth3DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
"acousticConjugateDepth4DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
"acousticConjugateDepth5DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
"acousticConjugateDepth6DepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
"acousticBottomAngle": np.array([-np.inf, 3, 5, 9, 13, 17, 21, 25, np.inf]),
"acousticDeepSoundChannelAxisDepth": np.array([-np.inf, 1, 250, 500, 1000, 1500, 2000, 3000, 4000, np.inf]),
"acousticDeepSoundChannelDepthExcess": np.array([-np.inf, 0, 50, 100, 150, 200, 225, 250, 275, np.inf]),
"acousticVerticalAngle": np.array([-np.inf, 3, 5, 9, 13, 17, 21, 25, np.inf]),
"acousticSecondarySoundChannelOneAxisDepth": np.array([0, 1, 100, 200, 300, 400, 500, 750, 1000, np.inf]),
"acousticSecondarySoundChannelTwoAxisDepth": np.array([0, 1, 100, 200, 300, 400, 500, 750, 1000, np.inf]),
"acousticSecondarySoundChannelOneCutoffFrequency": np.array(
[-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]),
"acousticSecondarySoundChannelTwoCutoffFrequency": np.array(
[-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]),
"acousticSurfaceDuctCutoffFrequency": np.array(
[-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]),
"acousticComplexHalfChannelCutoffFrequency": np.array(
[-np.inf, 0, 100, 200, 300, 400, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, np.inf]),
"acousticSurfaceDuctSonicLayerDepth": np.array(
[-np.inf, 25, 50, 100, 150, 200, 250, 300, 350, 400, 450, np.inf]),
"acousticSurfaceDuctSoundSpeedBelowLayerGradient": np.array(
[-np.inf, 5, 10, 15, 20, 25, 30, 35, 40, np.inf]),
"acousticConjugateDepth1Mask": np.array([-1.0, 0, np.inf]),
"acousticConjugateDepth2Mask": np.array([-1.0, 0, np.inf]),
"acousticConjugateDepth3Mask": np.array([-1.0, 0, np.inf]),
"acousticConjugateDepth4Mask": np.array([-1.0, 0, np.inf]),
"acousticConjugateDepth5Mask": np.array([-1.0, 0, np.inf]),
"acousticConjugateDepth6Mask": np.array([-1.0, 0, np.inf]),
"acousticDeepSoundChannelMask": np.array([-1.0, 0, np.inf]),
"acousticSecondarySoundChannelOneMask": np.array([-1.0, 0, np.inf]),
"acousticSecondarySoundChannelTwoMask": np.array([-1.0, 0, np.inf]),
"acousticSurfaceDuctMask": np.array([-1.0, 0, np.inf]),
"acousticComplexHalfChannelMask": np.array([-1.0, 0, np.inf])
}
self.counts_encoding = {"dtype": "int32", "missing_value": -30000, "_FillValue": -30000, "zlib": True}
def run(self):
logger.info("Starting computation process...")
# Initialize base directories
year_month = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m")
year_month_day = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m%d")
path_to_source = build_path_string(self.source_dir, self.model_name, year_month)
files_in_path = get_file_paths(path_to_source, f"*{year_month_day}*.nc")
logger.info(f"Found {len(files_in_path)} files in source directory: {files_in_path}")
# Set up filter to only gather files related to the analysis
filtered_files_in_path = filter_files(files_in_path, self.variables, self.model_name)
logger.info(f"Filtered files to {len(filtered_files_in_path)} files: {filtered_files_in_path}")
if not filtered_files_in_path:
logger.error(f"No files found after filtering for variables: {self.variables}")
return
# Change the output path to store the calculated files
calculated_output_path = build_path_string(self.output_dir, self.model_name, year_month)
logger.info(f"Calculated output path: {calculated_output_path}")
# Read in the data
logger.info(f"Reading data from files: {filtered_files_in_path}")
data = read_mfnetcdf(filtered_files_in_path, chunks={"time": "auto"}, drop_vars=["tau"])
logger.info(f"Variables in the dataset: {list(data.variables)}")
missing_vars = [var for var in self.variables if var not in data.variables]
if missing_vars:
logger.error(f"Missing the following variables to compute: {missing_vars}")
raise MissingVariables(f"Missing the following variables to compute: {missing_vars}")
# Compute statistics
for variable in self.variables:
for analysis_method in self.analysis_methods:
logger.info(f"Computing {analysis_method} for {variable}")
stat_data = calculate_stats(data, var_compute=variable, analysis_method=analysis_method)
compute_to_netcdf(stat_data, file_name=f"{self.model_name}-{year_month_day}",
output_dir=calculated_output_path, mode="a")
logger.info(f"Computed {analysis_method} for {variable}")
if "water_speed" in data.variables:
logger.info("Computing counts and probabilities for water_speed")
cnt_data = calculate_counts_xhistogram(data, "water_speed", "spd_cnts", "spd_bin", exclude_last_bin=True)
cnts_out_path = compute_to_netcdf(cnt_data, "spd_cnts", output_dir=f"{path_to_source}/temp",
file_name="output_cnts")
cnt_data = read_netcdf(cnts_out_path)
cnt_data = cnt_data.where(~(cnt_data.sum(dim=["spd_bin"]) == 0), other=np.nan)
prob_cnt = calculate_probabilities(cnt_data, "spd_cnts", "spd_bin")
compute_to_netcdf(prob_cnt, file_name=f"{self.model_name}-{year_month_day}",
output_dir=calculated_output_path, mode="a", encoding={"spd_cnts": self.counts_encoding})
os.remove(cnts_out_path)
logger.info("Computed counts and probabilities for water_speed")
logger.info("Starting APARMS file computations")
aparms_files = filter_files(files_in_path, self.aparms_file_names)
logger.info(f"Filtered APARMS files to {len(aparms_files)} files: {aparms_files}")
if aparms_files:
aparms_data = read_mfnetcdf(aparms_files, chunks={"time": "auto"}, decode_times=False)
times, datasets = zip(*aparms_data.groupby("time"))
for aparm_var, bins in self.aparms_var_data.items():
logger.info(f"Computing {aparm_var} with bins: {bins}")
compute_sets = [calculate_counts_xhistogram(d_set, aparm_var, f"{aparm_var}Cnts", bins=bins) for d_set
in datasets]
paths = [f"{calculated_output_path}/aparms_{year_month_day}_t{int(t):04d}.nc" for t in times]
xr.save_mfdataset(compute_sets, paths, mode="a", encoding={f"{aparm_var}Cnts": self.counts_encoding})
logger.info(f"Computed {aparm_var} with {len(times)} files.")
logger.info("Computation process completed successfully.")
def check_if_can_run(self):
year_month = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m")
year_month_day = datetime.strptime(self.date, "%Y%m%d").strftime("%Y%m%d")
path_to_source = build_path_string(self.source_dir, self.model_name, year_month)
files_in_path = get_file_paths(path_to_source, f"*{year_month_day}*.nc")
logger.info(f"Checking if computation can run with files: {files_in_path}")
if not files_in_path:
logger.error(f"No files found in path: {path_to_source}")
raise NoFilesReady(f"No files found for {self.model_name} on {self.date}")
filtered_files_in_path = filter_files(files_in_path, self.variables, self.model_name)
logger.info(f"Filtered files for required variables: {filtered_files_in_path}")
if not filtered_files_in_path:
logger.error(f"No files found for variables: {self.variables}")
return False
data = read_mfnetcdf(filtered_files_in_path)
logger.info(f"Variables in the dataset: {list(data.variables)}")
# Verify that the variables to compute are existing in the dataset
existing_vars = [y for y in data.data_vars if y in self.variables]
if len(existing_vars) != len(self.variables):
missing_vars = [y for y in self.variables if y not in data.data_vars]
error = f"Missing the following variables to compute: {missing_vars}"
logger.error(error)
raise MissingVariables(error)
logger.info("All necessary variables are present.")
return TrueEditor is loading...
Leave a Comment