Fonctions utiles
unknown
python
3 years ago
24 kB
10
Indexable
import numpy as np from mpl_toolkits.basemap import Basemap from netCDF4 import Dataset as NetCDFFile import matplotlib.pyplot as plt from matplotlib import ticker import skfuzzy as fuzz from skfuzzy import control as ctrl import imageio import os from geopy import distance from scipy.optimize import curve_fit import scipy import similaritymeasures from sewar.full_ref import mse, rmse, psnr, uqi, ssim, ergas, scc, rase, sam, msssim, vifp import math # Pour pouvoir extraire les données: # longitude / latitude / time / d = divergence / r = humidity / t = temperature / u=uwind / v=vwind / vo=vortcity # sp = surface_pressure #Pour déssiner une map pour une variable: def plotNC(path, var, unite, titre,jour,heure): nc = NetCDFFile(path) # Ouverture de la BDD #Extraction des tableaux de données: lat = nc.variables['latitude'][:] lon = nc.variables['longitude'][:] time = nc.variables['time'][:] vorticity = nc.variables['vo'][:] var = nc.variables[var][:] #Ou une autre variable au choix dans la définition de la fonction nc.close() #Création de la map map = Basemap(width=5000000, height=3500000, resolution='l', projection='cyl', llcrnrlon=lon.min(), llcrnrlat=lat.min(), urcrnrlon=lon.max(), urcrnrlat=lat.max(), lat_0=lat.mean(), lon_0=lon.mean()) lons, lats = np.meshgrid(lon, lat) xi, yi = map(lons, lats) map.drawmapboundary(fill_color='aqua') map.fillcontinents(color='coral', lake_color='aqua') map.drawcoastlines() parallels = np.arange(lat.min(), lat.max(), 5.) # make latitude lines every 5 degrees from xx to xx meridians = np.arange(lon.min(), lat.min(), 5.) # make longitude lines every 5 degrees from xx to xx map.drawparallels(parallels, labels=[1, 0, 0, 0], fontsize=5) map.drawmeridians(meridians, labels=[0, 0, 0, 1], fontsize=5) date_exact = (jour-1) * 24 + heure #Date exacte, décalage de -1 jour car la basse de données commence au jour 0 #Création des données en couleur cs = map.pcolor(xi, yi, np.squeeze(var[date_exact, :, :]),vmin = var.min(), vmax = var.max()) cbar = map.colorbar(cs, location='bottom', pad=0.2,) if titre == 'Divergence': tick_locator = ticker.MaxNLocator(nbins=5) cbar.locator = tick_locator cbar.update_ticks() cbar.set_label(unite,size=7) plt.title(titre,fontsize = 10) #Pour déssiner les map #Pour déssiner les map pour toutes les variables, avec export en photos: def plotTC(path): nc = NetCDFFile(path) j = [23,24,25,26,27,28,29,30,31] h = [10,23] vorticity = np.array(nc.variables['vo'][:]) latitude = np.array(nc.variables['latitude'][:]) longitude = np.array(nc.variables['longitude'][:]) # np.shape(vorticity) = (744,93,117) # np.shape(latitude) = (93,) # np.shape(longitude) = (117,) # L'indice sera la latitude et la longitude. Cependant,ce ne sont pas les vraies lat et lon. # Il faut prendre la matrice nc.variable de la latitude puis l'évaluer en l'index trouvé pour avoir la vraie latitude. # Et pareil pour la longitude. time = np.shape(vorticity)[0] x = [] y = [] for i in range(time): a = np.unravel_index(np.argmax(vorticity[i,:,:]), vorticity[i,:,:].shape) #prend le max de la vorticité qui correspond à la position du cyclone latitude_coord, longitude_coord = a[0],a[1] x.append(latitude[latitude_coord]) y.append(longitude[longitude_coord]) #Et pour chaque jour on crée les images: i = 0 for jour in j: for heure in h: plt.rcParams.update({'font.size': 5}) plt.subplot(2, 3, 1) plotNC(path, 'r', '%', 'Humidité relative',jour,heure) plt.subplot(2, 3, 2) plotNC(path, 'd', 'jsp', 'Divergence',jour,heure) plt.subplot(2, 3, 3) plotNC(path, 'vo', 'jsp', 'Vorticité',jour,heure) plt.subplot(2, 3, 4) plotNC(path, 't', 'Kelvin', 'Température',jour,heure) plt.subplot(2, 3, 5) plotNC(path, 'u', 'm/s', 'Uwind',jour,heure) plt.subplot(2, 3, 6) plotNC(path, 'v', 'm/s', 'Vwind',jour,heure) plt.savefig(f"{i}.png", dpi=300) plt.clf() i = i+1 #Pour faire une belle séquence des images: graphs = [] for i in range(0,16): graphs.append(imageio.imread(f"{i}.png")) imageio.mimsave(f"{os. getcwd()}.gif",graphs) plt.show() #Nous donne en sortie la position du TC à un moment donné (Latitude, Longitude) def position_TC(jour_TC, heure_TC, path): nc_TC = NetCDFFile(path) date = (jour_TC-1) * 24 + heure_TC v = np.array(nc_TC.variables['v'][:]) u = np.array(nc_TC.variables['u'][:]) vor = np.array(nc_TC.variables['vo'][:]) lat = np.array(nc_TC.variables['latitude'][:]) lon = np.array(nc_TC.variables['longitude'][:]) u = u[date, :, :] v = v[date, :, :] vor = vor[date, :, :] x = scipy.integrate.cumtrapz(v, lon, axis=1, initial=0) y = scipy.integrate.cumtrapz(u, lat, axis=0, initial=0) psi_vor = (y - x) psi_vor = psi_vor / np.min(psi_vor) + vor / np.min(vor) latitude = np.array(nc_TC.variables['latitude'][:]) longitude = np.array(nc_TC.variables['longitude'][:]) # On trouve la vorticité max à cette date, dans une certaine zone a = np.unravel_index(np.nanargmin(psi_vor), psi_vor.shape) # Puis on envoie les coordonnées # print(f"(Lon,Lat) = {latitude[a[0]], longitude[a[1]]}") nc_TC.close() return latitude[a[0]], longitude[a[1]] def ffposition_TC(jour,heure,path): nc = NetCDFFile(path) date_exact = (jour-1) * 24 + heure vwind = np.array(nc.variables['v'][:]) uwind = np.array(nc.variables['u'][:]) latitude = np.array(nc.variables['latitude'][:]) longitude = np.array(nc.variables['longitude'][:]) vo = np.array(nc.variables['vo'][:]) x = scipy.integrate.cumtrapz(vwind[date_exact, :, :], longitude, axis=1, initial=0) y = scipy.integrate.cumtrapz(uwind[date_exact, :, :], latitude, axis=0, initial=0) psi_vor = (y - x) psi_vor = psi_vor / np.min(psi_vor) + vo / np.min(vo) a = np.unravel_index(np.nanargmin(psi_vor), psi_vor.shape) # Puis on envoie les coordonnées return latitude[a[0]], longitude[a[1]] def fposition_TC(jour, heure, path): nc = NetCDFFile(path) date_exact = (jour - 1) * 24 + heure vorticity = np.array(nc.variables['vo'][:]) latitude = np.array(nc.variables['latitude'][:]) longitude = np.array(nc.variables['longitude'][:]) # On trouve la vorticité max à cette date a = np.unravel_index(np.argmax(vorticity[date_exact, :, :]), vorticity[date_exact, :, :].shape) # Puis on envoie les coordonnées return (latitude[a[0]], longitude[a[1]]) # Return Lat,Lon #Nous donne en sortie le paramètre de coriolis def parametre_coriolis(latitude): latitude = (latitude*np.pi)/180 return 2*0.72921*(10**(-4))*np.sin(latitude) #Nous donne en sortie la taille du Cyclone (diamètre le plus grand de l'ellipse) def taille_TC(jour, heure, seuil, path): nc = NetCDFFile(path) date_exact = (jour - 1) * 24 + heure # np.shape(vorticity) = (744,93,117) # np.shape(latitude) = (93,) # np.shape(longitude) = (117,) vorticity = np.array(nc.variables['vo'][:]) latitude = np.array(nc.variables['latitude'][:]) longitude = np.array(nc.variables['longitude'][:]) # print(longitude[80]) gives -74.0 # Limit longitude study up to -74.0 to the right to ignore high vorticity where there is no cyclone; can be observed on the .gif vo_max = np.unravel_index(np.argmax(vorticity[date_exact, :, 0:74]), vorticity[date_exact, :, 0:74].shape) lon_TC = vo_max[1] lat_TC = vo_max[0] s = seuil vorticity[vorticity < s] = 0 # Calculating min and max lon and lat # Hovering the latitude, fixing longitude of current position lat_vo_min = 0 # Initialising values lat_vo_max = 0 # Initialising values for i in range(93): if vorticity[ date_exact, i, lon_TC] > 0: # Taking the first non zero value index as the start point of the TC latitude lat_vo_min = i break if lat_vo_min == 0: # If lat_vo_min==0 it means that all values were below s lat_vo_max = 0 else: for i in range(lat_vo_min, 93): if vorticity[ date_exact, i, lon_TC] == 0: # Taking the last non zero value index as the end point of the TC latitude lat_vo_max = i - 1 break # Hovering the longitude, fixing latitude of current position lon_vo_min = 0 # Initialising values lon_vo_max = 0 # Initialising values for i in range(117): if vorticity[ date_exact, lat_TC, i] > 0: # Taking the first non zero valueindex as the start point of the TC longitude lon_vo_min = i break if lon_vo_min == 0: lon_vo_max = 0 else: for i in range(lon_vo_min, 93): if vorticity[ date_exact, lat_TC, i] == 0: # Taking the last non zero value index as the end point of the TC longitude lon_vo_max = i - 1 break # print(f"LAT : Min:{lat_vo_min},Max:{lat_vo_max}") # print(f"LON : Min{lon_vo_min},Max:{lon_vo_max}") # --------------------------------------------------------------------------------- lat1, lon1, lat2, lon2, R = lat_vo_min, lon_vo_min, lat_vo_max, lon_vo_max, 6373.0 # Distance between lon_vo_min and lon_vo_max, lat_TC constant coordinates_from = [lat_TC, lon1] coordinates_to = [lat_TC, lon2] distance_geopy = distance.distance(coordinates_from, coordinates_to).km distance_geopy_great_circle = distance.great_circle(coordinates_from, coordinates_to).km longueur_longitudinale = np.rint((distance_geopy + distance_geopy_great_circle) / 2) # print('Longueur longitudinale', longueur_longitudinale) # Distance between lat_vo_min and lat_vo_max, lon_TC constant coordinates_from = [lat1, lon_TC] coordinates_to = [lat2, lon_TC] distance_geopy = distance.distance(coordinates_from, coordinates_to).km distance_geopy_great_circle = distance.great_circle(coordinates_from, coordinates_to).km longueur_latitudinale = np.rint((distance_geopy + distance_geopy_great_circle) / 2) # print('Longueur latitudinale', longueur_latitudinale) # print('AIRE', longueur_latitudinale*longueur_longitudinale,'M2') res = max(longueur_latitudinale, longueur_longitudinale) return res #Calcule le module de la vitesse en tout point à tout moment def calcul_pth(path): nc = NetCDFFile(path) uwind = np.array(nc.variables['u'][:]) vwind = np.array(nc.variables['v'][:]) pth = np.sqrt(np.power(uwind, 2) + np.power(vwind, 2)) return(pth) #Donne en sortie la vitesse du TC (utilise la matrice créée juste avant avec calcul_pth) def vitesse_TC(jour,heure,pth): res = np.max(pth[(jour-1)*24+heure]) return(res) # Pythagore #Donne la sortie la variation de vitesse du TC, à distance d'une heure) def variation_vitesse_TC(jour,heure,pth): rep = vitesse_TC(jour,heure,pth)-vitesse_TC(jour,heure-1,pth) return(rep) #Donne en sortie la pression à la surface où se trouve le TC, à un moment donné def pression_TC(jour,heure,path,pressure_path): nc = NetCDFFile(path) pressure = np.array(NetCDFFile(pressure_path).variables['sp'][:]) date_exact = (jour-1) * 24 + heure vorticity = np.array(nc.variables['vo'][:]) # Puis on envoie les coordonnées lat,lon = position_TC(jour,heure,path) return(np.rint(pressure[date_exact,lat,lon])) def variation_pression_TC(jour,heure,path,pressure_path): return pression_TC(jour,heure,path,pressure_path)-pression_TC(jour,heure-1,path,pressure_path) def relation_pression_taille_TC(path,pression): t = [] taille = [] pressure = [] for i in range(23, 32): for h in range(24): t.append((i - 1) * 24 + h) pressure.append(pression_TC(i, h, path, pression)) taille.append(taille_TC(i, h, 0.0006, path)) fig, ax1 = plt.subplots() color = 'tab:red' ax1.set_xlabel('Temps') ax1.set_ylabel('Pression (Pa)', color=color) ax1.plot(t, pressure, color=color) ax1.tick_params(axis='y', labelcolor=color) ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis color = 'tab:blue' ax2.set_ylabel('Taille (Km)', color=color) # we already handled the x-label with ax1 ax2.plot(t, taille, color=color) ax2.tick_params(axis='y', labelcolor=color) fig.tight_layout() # otherwise the right y-label is slightly clipped plt.show() def relation_pression_vitesse_TC(path,pression): pth = calcul_pth(path) t = [] taille = [] pressure = [] for i in range(23, 32): for h in range(24): t.append((i - 1) * 24 + h) pressure.append(pression_TC(i, h, path, pression)) taille.append(vitesse_TC(i, h, pth)) fig, ax1 = plt.subplots() color = 'tab:red' ax1.set_xlabel('Temps') ax1.set_ylabel('Pression (Pa)', color=color) ax1.plot(t, pressure, color=color) ax1.tick_params(axis='y', labelcolor=color) ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis color = 'tab:blue' ax2.set_ylabel('Vitesse (Km/h)', color=color) # we already handled the x-label with ax1 ax2.plot(t, taille, color=color) ax2.tick_params(axis='y', labelcolor=color) fig.tight_layout() # otherwise the right y-label is slightly clipped plt.show() def regression_trajectoire_TC(debut, fin, path): x_full = [] y_full = [] # Données full for i in range(23, 32): for h in range(24): pos1 = position_TC(i, h, path)[0] pos2 = position_TC(i, h, path)[1] x_full.append(pos1) y_full.append(pos2) # Données entrainement x = [] y = [] for i in range(debut, fin+1): for h in range(24): pos1 = position_TC(i, h, path)[0] pos2 = position_TC(i, h, path)[1] x.append(pos1) y.append(pos2) plt.scatter(y, x) plt.show() def reg_poly(x, y, x_full, y_full): def objective(x, a, b, c): return a * (x ** 2) + b * x + c params = np.polyfit(x,y,2) a = params[0] b = params[1] c = params[2] y_test = [] for pt in x_full: y_test.append(objective(pt, a, b, c)) y_test = np.array(y_test) plt.scatter(y_test, x_full) plt.show() erreur = [] for i in range(len(y_full)): e = (y_full[i] - y_test[i]) ** 2 erreur.append(np.sqrt(e)) print(f"Erreur poly en moyenne:{np.sum(erreur)}") reg_poly(x, y, x_full, y_full) def reg_lin(x, y, x_full): def objective(x, a, b): return a * x + b params = curve_fit(objective, x, y) a, b = params[0] y_test = [] for pt in x_full: y_test.append(objective(pt, a, b)) plt.scatter(y_test, x_full) plt.show() erreur = [] for i in range(len(y_full)): e = (y_full[i] - y_test[i]) ** 2 erreur.append(np.sqrt(e)) print(f"Erreur lin en moyenne:{np.sum(erreur)}") reg_lin(x, y, x_full) def distance_minimale(jour,path): x_full = [] y_full = [] # Données full for i in range(23, 32): for h in range(24): pos1 = position_TC(i, h, path)[0] pos2 = position_TC(i, h, path)[1] x_full.append(pos1) y_full.append(pos2) # Données entrainement x = [] y = [] for i in range(23, jour): for h in range(24): pos1 = position_TC(i, h, path)[0] pos2 = position_TC(i, h, path)[1] x.append(pos1) y.append(pos2) plt.scatter(y,x,label='Entrainement') plt.legend() plt.show() def reg_poly(x, y, x_full, y_full): def objective(x, a, b, c): return a * (x ** 2) + b * x + c params = np.polyfit(x,y,2) a = params[0] b = params[1] c = params[2] y_test = [] for pt in x_full: y_test.append(objective(pt-5, a, b, c)-6) y_test = np.array(y_test) plt.scatter(y_test, x_full, label = "Estimation poly2") plt.scatter(y_full, x_full, label="Real") plt.legend() plt.show() erreur = [] for i in range(len(y_full)): e = (y_full[i] - y_test[i]) ** 2 erreur.append(np.sqrt(e)) print(f"Erreur poly totale:{np.sum(erreur)}") xy_entr_futur=[] for i in range(len(y_full)): if x_full[i]>position_TC(jour,0,path)[0]: pos1 = x_full[i] pos2 = objective(pos1-5,a,b,c)-6 xy_entr_futur.append((pos1,pos2)) return xy_entr_futur def reg_lin(x, y, x_full,y_full): def objective(x, a, b): return a * x + b params = curve_fit(objective, x, y) a, b = params[0] y_test = [] for pt in x_full: y_test.append(objective(pt, -1*a, b)+36) y_test = np.array(y_test) plt.scatter(y_test, x_full, label = "Estimation lin") plt.scatter(y_full, x_full, label="Real") plt.legend() plt.show() erreur = [] for i in range(len(y_full)): e = (y_full[i] - y_test[i]) ** 2 erreur.append(np.sqrt(e)) xy_entr_futur = [] for i in range(len(y_full)): if x_full[i]>position_TC(jour,0,path)[0]: pos1 = x_full[i] pos2 = objective(pos1,-1*a,b)+36 xy_entr_futur.append((pos1,pos2)) print(f"Erreur lin totale:{np.sum(erreur)}") return xy_entr_futur xy_entr_poly = reg_poly(x, y, x_full, y_full) xy_entr_lin = reg_lin(x,y,x_full,y_full) #Soit pos_current la position à l'instant où on arrête les données d'entrainement #Et à partir duquel on souhaite faire les prédictions pos_current = position_TC(jour,0,path) #(Lat,Lon) # ENTRAINEMENT POLY2 temp = [] for i in range(len(xy_entr_poly)): temp.append(pos_current) distance_tous_poly2=[] for i in range(len(xy_entr_poly)): distance_tous_poly2.append(math.dist(xy_entr_poly[i],temp[i])) distance_min_coord_poly2 = distance_tous_poly2.index(min(distance_tous_poly2)) print('Poly2:') print(f"Pt de distance min coord: {distance_min_coord_poly2}") distance_min = xy_entr_poly[distance_min_coord_poly2] print(f"Pt de distance min: {distance_min}") # ENTRAINEMENT LIN temp = [] for i in range(len(xy_entr_lin)): temp.append(pos_current) distance_tous_lin = [] for i in range(len(xy_entr_lin)): distance_tous_lin.append(math.dist(xy_entr_lin[i], temp[i])) distance_min_coord_lin = distance_tous_lin.index(min(distance_tous_lin)) print('Lin:') print(f"Pt de distance min coord: {distance_min_coord_lin}") distance_min = xy_entr_lin[distance_min_coord_lin] print(f"Pt de distance min: {distance_min}") def plot_array(array, lon_plot, lat_plot): map_plot = Basemap(width=5000000, height=3500000, resolution='l', projection='cyl', llcrnrlon=lon_plot.min(), llcrnrlat=lat_plot.min(), urcrnrlon=lon_plot.max(), urcrnrlat=lat_plot.max(), lat_0=lat_plot.mean(), lon_0=lon_plot.mean()) lons, lats = np.meshgrid(lon_plot, lat_plot) xi, yi = map_plot(lons, lats) map_plot.drawmapboundary(fill_color='aqua') map_plot.fillcontinents(color='coral', lake_color='aqua') map_plot.drawcoastlines() cs = map_plot.pcolor(xi, yi, array) cbar = map_plot.colorbar(cs, location='bottom', pad="10%") def met_to_deg(x): return x / 111000 def deg_to_met(x): return x * 111000 def cart_to_polar(x0, y0, x1, y1): # Convertit des coordonnées cartésiennes (x,y) en polaires (r,theta), dans le repère (o,x0,y0) # Cette fonction fonctionne aussi avec des matrices x0 = met_to_deg(x0) y0 = met_to_deg(y0) x1 = met_to_deg(x1) y1 = met_to_deg(y1) x_ref = x1 - x0 y_ref = y1 - y0 x_ref, y_ref = np.meshgrid(x_ref, y_ref) r = np.sqrt(deg_to_met(x_ref) ** 2 + deg_to_met(y_ref) ** 2) angle = np.arctan2(x_ref, y_ref) return r, angle def polar_to_cart(r,angle): angle = np.radians(angle) x = r*np.cos(angle) y = r*np.sin(angle) return x,y def angle(jour,heure,data_path): # print(position_TC(jour, heure-12, data_path)) # print(position_TC(jour, heure, data_path)) nc = NetCDFFile(data_path) date_exact = (jour - 1) * 24 + heure u = np.array(nc.variables['u'][:]) lon = np.array(nc.variables['longitude'][:]) lat = np.array(nc.variables['latitude'][:]) y0, x0 = position_TC(jour, heure-6, data_path) y1, x1 = position_TC(jour, heure, data_path) y0 = deg_to_met(y0) x0 = deg_to_met(x0) y1 = deg_to_met(y1) x1 = deg_to_met(x1) r, angle = cart_to_polar(x0, y0, x1, y1) angle = np.degrees(angle[0][0]) # plot_array(u[date_exact,:,:],lon,lat) # plt.plot(met_to_deg(x0), met_to_deg(y0), ms=5, marker='o', markeredgecolor='red') # plt.plot(met_to_deg(x1), met_to_deg(y1), ms=8, marker='x', markeredgecolor='green') # plt.show() return np.round(angle, decimals = 1) def entree_SF4(jour,heure,data_path): return angle(jour,heure,data_path)-angle(jour,heure-6,data_path) def xy_to_indx_lonlat(x, y, lon_map, lat_map): # On prend l'index ou (x,y)=(lon,lat), avec une tolérance de 25 km, # sachant que l'écart entre 2 valeurs de (lon,lat) est d'env 25 km aussi idx_lon = np.where(np.isclose(lon_map, x, atol=0.5))[-1][-1] # sachant que la résolution est de env 25000 km idx_lat = np.where(np.isclose(lat_map, y, atol=0.5))[-1][-1] return idx_lon, idx_lat def output_fuzz(csq,output_var): output = np.zeros(1) first_rule = True for membi in output_var.terms: csq_i = '['+str(output_var.terms[membi])+']' if csq[csq_i] != 0: x_csq = fuzz.fuzzymath.interp_universe(output_var.universe,output_var.terms[membi].mf , 1) if first_rule: output = np.array([[np.max(x_csq),csq[csq_i]]]) first_rule = False else: output = np.append(output,[[(np.min(x_csq)),csq[csq_i]]],axis=0) return output
Editor is loading...