Fonctions utiles
unknown
python
3 years ago
24 kB
16
Indexable
import numpy as np
from mpl_toolkits.basemap import Basemap
from netCDF4 import Dataset as NetCDFFile
import matplotlib.pyplot as plt
from matplotlib import ticker
import skfuzzy as fuzz
from skfuzzy import control as ctrl
import imageio
import os
from geopy import distance
from scipy.optimize import curve_fit
import scipy
import similaritymeasures
from sewar.full_ref import mse, rmse, psnr, uqi, ssim, ergas, scc, rase, sam, msssim, vifp
import math
# Pour pouvoir extraire les données:
# longitude / latitude / time / d = divergence / r = humidity / t = temperature / u=uwind / v=vwind / vo=vortcity
# sp = surface_pressure
#Pour déssiner une map pour une variable:
def plotNC(path, var, unite, titre,jour,heure):
nc = NetCDFFile(path) # Ouverture de la BDD
#Extraction des tableaux de données:
lat = nc.variables['latitude'][:]
lon = nc.variables['longitude'][:]
time = nc.variables['time'][:]
vorticity = nc.variables['vo'][:]
var = nc.variables[var][:] #Ou une autre variable au choix dans la définition de la fonction
nc.close()
#Création de la map
map = Basemap(width=5000000, height=3500000,
resolution='l', projection='cyl',
llcrnrlon=lon.min(), llcrnrlat=lat.min(), urcrnrlon=lon.max(), urcrnrlat=lat.max(), lat_0=lat.mean(),
lon_0=lon.mean())
lons, lats = np.meshgrid(lon, lat)
xi, yi = map(lons, lats)
map.drawmapboundary(fill_color='aqua')
map.fillcontinents(color='coral', lake_color='aqua')
map.drawcoastlines()
parallels = np.arange(lat.min(), lat.max(), 5.) # make latitude lines every 5 degrees from xx to xx
meridians = np.arange(lon.min(), lat.min(), 5.) # make longitude lines every 5 degrees from xx to xx
map.drawparallels(parallels, labels=[1, 0, 0, 0], fontsize=5)
map.drawmeridians(meridians, labels=[0, 0, 0, 1], fontsize=5)
date_exact = (jour-1) * 24 + heure #Date exacte, décalage de -1 jour car la basse de données commence au jour 0
#Création des données en couleur
cs = map.pcolor(xi, yi, np.squeeze(var[date_exact, :, :]),vmin = var.min(), vmax = var.max())
cbar = map.colorbar(cs, location='bottom', pad=0.2,)
if titre == 'Divergence':
tick_locator = ticker.MaxNLocator(nbins=5)
cbar.locator = tick_locator
cbar.update_ticks()
cbar.set_label(unite,size=7)
plt.title(titre,fontsize = 10) #Pour déssiner les map
#Pour déssiner les map pour toutes les variables, avec export en photos:
def plotTC(path):
nc = NetCDFFile(path)
j = [23,24,25,26,27,28,29,30,31]
h = [10,23]
vorticity = np.array(nc.variables['vo'][:])
latitude = np.array(nc.variables['latitude'][:])
longitude = np.array(nc.variables['longitude'][:])
# np.shape(vorticity) = (744,93,117)
# np.shape(latitude) = (93,)
# np.shape(longitude) = (117,)
# L'indice sera la latitude et la longitude. Cependant,ce ne sont pas les vraies lat et lon.
# Il faut prendre la matrice nc.variable de la latitude puis l'évaluer en l'index trouvé pour avoir la vraie latitude.
# Et pareil pour la longitude.
time = np.shape(vorticity)[0]
x = []
y = []
for i in range(time):
a = np.unravel_index(np.argmax(vorticity[i,:,:]), vorticity[i,:,:].shape) #prend le max de la vorticité qui correspond à la position du cyclone
latitude_coord, longitude_coord = a[0],a[1]
x.append(latitude[latitude_coord])
y.append(longitude[longitude_coord])
#Et pour chaque jour on crée les images:
i = 0
for jour in j:
for heure in h:
plt.rcParams.update({'font.size': 5})
plt.subplot(2, 3, 1)
plotNC(path, 'r', '%', 'Humidité relative',jour,heure)
plt.subplot(2, 3, 2)
plotNC(path, 'd', 'jsp', 'Divergence',jour,heure)
plt.subplot(2, 3, 3)
plotNC(path, 'vo', 'jsp', 'Vorticité',jour,heure)
plt.subplot(2, 3, 4)
plotNC(path, 't', 'Kelvin', 'Température',jour,heure)
plt.subplot(2, 3, 5)
plotNC(path, 'u', 'm/s', 'Uwind',jour,heure)
plt.subplot(2, 3, 6)
plotNC(path, 'v', 'm/s', 'Vwind',jour,heure)
plt.savefig(f"{i}.png", dpi=300)
plt.clf()
i = i+1
#Pour faire une belle séquence des images:
graphs = []
for i in range(0,16):
graphs.append(imageio.imread(f"{i}.png"))
imageio.mimsave(f"{os. getcwd()}.gif",graphs)
plt.show()
#Nous donne en sortie la position du TC à un moment donné (Latitude, Longitude)
def position_TC(jour_TC, heure_TC, path):
nc_TC = NetCDFFile(path)
date = (jour_TC-1) * 24 + heure_TC
v = np.array(nc_TC.variables['v'][:])
u = np.array(nc_TC.variables['u'][:])
vor = np.array(nc_TC.variables['vo'][:])
lat = np.array(nc_TC.variables['latitude'][:])
lon = np.array(nc_TC.variables['longitude'][:])
u = u[date, :, :]
v = v[date, :, :]
vor = vor[date, :, :]
x = scipy.integrate.cumtrapz(v, lon, axis=1, initial=0)
y = scipy.integrate.cumtrapz(u, lat, axis=0, initial=0)
psi_vor = (y - x)
psi_vor = psi_vor / np.min(psi_vor) + vor / np.min(vor)
latitude = np.array(nc_TC.variables['latitude'][:])
longitude = np.array(nc_TC.variables['longitude'][:])
# On trouve la vorticité max à cette date, dans une certaine zone
a = np.unravel_index(np.nanargmin(psi_vor), psi_vor.shape)
# Puis on envoie les coordonnées
# print(f"(Lon,Lat) = {latitude[a[0]], longitude[a[1]]}")
nc_TC.close()
return latitude[a[0]], longitude[a[1]]
def ffposition_TC(jour,heure,path):
nc = NetCDFFile(path)
date_exact = (jour-1) * 24 + heure
vwind = np.array(nc.variables['v'][:])
uwind = np.array(nc.variables['u'][:])
latitude = np.array(nc.variables['latitude'][:])
longitude = np.array(nc.variables['longitude'][:])
vo = np.array(nc.variables['vo'][:])
x = scipy.integrate.cumtrapz(vwind[date_exact, :, :], longitude, axis=1, initial=0)
y = scipy.integrate.cumtrapz(uwind[date_exact, :, :], latitude, axis=0, initial=0)
psi_vor = (y - x)
psi_vor = psi_vor / np.min(psi_vor) + vo / np.min(vo)
a = np.unravel_index(np.nanargmin(psi_vor), psi_vor.shape)
# Puis on envoie les coordonnées
return latitude[a[0]], longitude[a[1]]
def fposition_TC(jour, heure, path):
nc = NetCDFFile(path)
date_exact = (jour - 1) * 24 + heure
vorticity = np.array(nc.variables['vo'][:])
latitude = np.array(nc.variables['latitude'][:])
longitude = np.array(nc.variables['longitude'][:])
# On trouve la vorticité max à cette date
a = np.unravel_index(np.argmax(vorticity[date_exact, :, :]), vorticity[date_exact, :, :].shape)
# Puis on envoie les coordonnées
return (latitude[a[0]], longitude[a[1]]) # Return Lat,Lon
#Nous donne en sortie le paramètre de coriolis
def parametre_coriolis(latitude):
latitude = (latitude*np.pi)/180
return 2*0.72921*(10**(-4))*np.sin(latitude)
#Nous donne en sortie la taille du Cyclone (diamètre le plus grand de l'ellipse)
def taille_TC(jour, heure, seuil, path):
nc = NetCDFFile(path)
date_exact = (jour - 1) * 24 + heure
# np.shape(vorticity) = (744,93,117)
# np.shape(latitude) = (93,)
# np.shape(longitude) = (117,)
vorticity = np.array(nc.variables['vo'][:])
latitude = np.array(nc.variables['latitude'][:])
longitude = np.array(nc.variables['longitude'][:])
# print(longitude[80]) gives -74.0
# Limit longitude study up to -74.0 to the right to ignore high vorticity where there is no cyclone; can be observed on the .gif
vo_max = np.unravel_index(np.argmax(vorticity[date_exact, :, 0:74]), vorticity[date_exact, :, 0:74].shape)
lon_TC = vo_max[1]
lat_TC = vo_max[0]
s = seuil
vorticity[vorticity < s] = 0
# Calculating min and max lon and lat
# Hovering the latitude, fixing longitude of current position
lat_vo_min = 0 # Initialising values
lat_vo_max = 0 # Initialising values
for i in range(93):
if vorticity[
date_exact, i, lon_TC] > 0: # Taking the first non zero value index as the start point of the TC latitude
lat_vo_min = i
break
if lat_vo_min == 0: # If lat_vo_min==0 it means that all values were below s
lat_vo_max = 0
else:
for i in range(lat_vo_min, 93):
if vorticity[
date_exact, i, lon_TC] == 0: # Taking the last non zero value index as the end point of the TC latitude
lat_vo_max = i - 1
break
# Hovering the longitude, fixing latitude of current position
lon_vo_min = 0 # Initialising values
lon_vo_max = 0 # Initialising values
for i in range(117):
if vorticity[
date_exact, lat_TC, i] > 0: # Taking the first non zero valueindex as the start point of the TC longitude
lon_vo_min = i
break
if lon_vo_min == 0:
lon_vo_max = 0
else:
for i in range(lon_vo_min, 93):
if vorticity[
date_exact, lat_TC, i] == 0: # Taking the last non zero value index as the end point of the TC longitude
lon_vo_max = i - 1
break
# print(f"LAT : Min:{lat_vo_min},Max:{lat_vo_max}")
# print(f"LON : Min{lon_vo_min},Max:{lon_vo_max}")
# ---------------------------------------------------------------------------------
lat1, lon1, lat2, lon2, R = lat_vo_min, lon_vo_min, lat_vo_max, lon_vo_max, 6373.0
# Distance between lon_vo_min and lon_vo_max, lat_TC constant
coordinates_from = [lat_TC, lon1]
coordinates_to = [lat_TC, lon2]
distance_geopy = distance.distance(coordinates_from, coordinates_to).km
distance_geopy_great_circle = distance.great_circle(coordinates_from, coordinates_to).km
longueur_longitudinale = np.rint((distance_geopy + distance_geopy_great_circle) / 2)
# print('Longueur longitudinale', longueur_longitudinale)
# Distance between lat_vo_min and lat_vo_max, lon_TC constant
coordinates_from = [lat1, lon_TC]
coordinates_to = [lat2, lon_TC]
distance_geopy = distance.distance(coordinates_from, coordinates_to).km
distance_geopy_great_circle = distance.great_circle(coordinates_from, coordinates_to).km
longueur_latitudinale = np.rint((distance_geopy + distance_geopy_great_circle) / 2)
# print('Longueur latitudinale', longueur_latitudinale)
# print('AIRE', longueur_latitudinale*longueur_longitudinale,'M2')
res = max(longueur_latitudinale, longueur_longitudinale)
return res
#Calcule le module de la vitesse en tout point à tout moment
def calcul_pth(path):
nc = NetCDFFile(path)
uwind = np.array(nc.variables['u'][:])
vwind = np.array(nc.variables['v'][:])
pth = np.sqrt(np.power(uwind, 2) + np.power(vwind, 2))
return(pth)
#Donne en sortie la vitesse du TC (utilise la matrice créée juste avant avec calcul_pth)
def vitesse_TC(jour,heure,pth):
res = np.max(pth[(jour-1)*24+heure])
return(res) # Pythagore
#Donne la sortie la variation de vitesse du TC, à distance d'une heure)
def variation_vitesse_TC(jour,heure,pth):
rep = vitesse_TC(jour,heure,pth)-vitesse_TC(jour,heure-1,pth)
return(rep)
#Donne en sortie la pression à la surface où se trouve le TC, à un moment donné
def pression_TC(jour,heure,path,pressure_path):
nc = NetCDFFile(path)
pressure = np.array(NetCDFFile(pressure_path).variables['sp'][:])
date_exact = (jour-1) * 24 + heure
vorticity = np.array(nc.variables['vo'][:])
# Puis on envoie les coordonnées
lat,lon = position_TC(jour,heure,path)
return(np.rint(pressure[date_exact,lat,lon]))
def variation_pression_TC(jour,heure,path,pressure_path):
return pression_TC(jour,heure,path,pressure_path)-pression_TC(jour,heure-1,path,pressure_path)
def relation_pression_taille_TC(path,pression):
t = []
taille = []
pressure = []
for i in range(23, 32):
for h in range(24):
t.append((i - 1) * 24 + h)
pressure.append(pression_TC(i, h, path, pression))
taille.append(taille_TC(i, h, 0.0006, path))
fig, ax1 = plt.subplots()
color = 'tab:red'
ax1.set_xlabel('Temps')
ax1.set_ylabel('Pression (Pa)', color=color)
ax1.plot(t, pressure, color=color)
ax1.tick_params(axis='y', labelcolor=color)
ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis
color = 'tab:blue'
ax2.set_ylabel('Taille (Km)', color=color) # we already handled the x-label with ax1
ax2.plot(t, taille, color=color)
ax2.tick_params(axis='y', labelcolor=color)
fig.tight_layout() # otherwise the right y-label is slightly clipped
plt.show()
def relation_pression_vitesse_TC(path,pression):
pth = calcul_pth(path)
t = []
taille = []
pressure = []
for i in range(23, 32):
for h in range(24):
t.append((i - 1) * 24 + h)
pressure.append(pression_TC(i, h, path, pression))
taille.append(vitesse_TC(i, h, pth))
fig, ax1 = plt.subplots()
color = 'tab:red'
ax1.set_xlabel('Temps')
ax1.set_ylabel('Pression (Pa)', color=color)
ax1.plot(t, pressure, color=color)
ax1.tick_params(axis='y', labelcolor=color)
ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis
color = 'tab:blue'
ax2.set_ylabel('Vitesse (Km/h)', color=color) # we already handled the x-label with ax1
ax2.plot(t, taille, color=color)
ax2.tick_params(axis='y', labelcolor=color)
fig.tight_layout() # otherwise the right y-label is slightly clipped
plt.show()
def regression_trajectoire_TC(debut, fin, path):
x_full = []
y_full = []
# Données full
for i in range(23, 32):
for h in range(24):
pos1 = position_TC(i, h, path)[0]
pos2 = position_TC(i, h, path)[1]
x_full.append(pos1)
y_full.append(pos2)
# Données entrainement
x = []
y = []
for i in range(debut, fin+1):
for h in range(24):
pos1 = position_TC(i, h, path)[0]
pos2 = position_TC(i, h, path)[1]
x.append(pos1)
y.append(pos2)
plt.scatter(y, x)
plt.show()
def reg_poly(x, y, x_full, y_full):
def objective(x, a, b, c):
return a * (x ** 2) + b * x + c
params = np.polyfit(x,y,2)
a = params[0]
b = params[1]
c = params[2]
y_test = []
for pt in x_full:
y_test.append(objective(pt, a, b, c))
y_test = np.array(y_test)
plt.scatter(y_test, x_full)
plt.show()
erreur = []
for i in range(len(y_full)):
e = (y_full[i] - y_test[i]) ** 2
erreur.append(np.sqrt(e))
print(f"Erreur poly en moyenne:{np.sum(erreur)}")
reg_poly(x, y, x_full, y_full)
def reg_lin(x, y, x_full):
def objective(x, a, b):
return a * x + b
params = curve_fit(objective, x, y)
a, b = params[0]
y_test = []
for pt in x_full:
y_test.append(objective(pt, a, b))
plt.scatter(y_test, x_full)
plt.show()
erreur = []
for i in range(len(y_full)):
e = (y_full[i] - y_test[i]) ** 2
erreur.append(np.sqrt(e))
print(f"Erreur lin en moyenne:{np.sum(erreur)}")
reg_lin(x, y, x_full)
def distance_minimale(jour,path):
x_full = []
y_full = []
# Données full
for i in range(23, 32):
for h in range(24):
pos1 = position_TC(i, h, path)[0]
pos2 = position_TC(i, h, path)[1]
x_full.append(pos1)
y_full.append(pos2)
# Données entrainement
x = []
y = []
for i in range(23, jour):
for h in range(24):
pos1 = position_TC(i, h, path)[0]
pos2 = position_TC(i, h, path)[1]
x.append(pos1)
y.append(pos2)
plt.scatter(y,x,label='Entrainement')
plt.legend()
plt.show()
def reg_poly(x, y, x_full, y_full):
def objective(x, a, b, c):
return a * (x ** 2) + b * x + c
params = np.polyfit(x,y,2)
a = params[0]
b = params[1]
c = params[2]
y_test = []
for pt in x_full:
y_test.append(objective(pt-5, a, b, c)-6)
y_test = np.array(y_test)
plt.scatter(y_test, x_full, label = "Estimation poly2")
plt.scatter(y_full, x_full, label="Real")
plt.legend()
plt.show()
erreur = []
for i in range(len(y_full)):
e = (y_full[i] - y_test[i]) ** 2
erreur.append(np.sqrt(e))
print(f"Erreur poly totale:{np.sum(erreur)}")
xy_entr_futur=[]
for i in range(len(y_full)):
if x_full[i]>position_TC(jour,0,path)[0]:
pos1 = x_full[i]
pos2 = objective(pos1-5,a,b,c)-6
xy_entr_futur.append((pos1,pos2))
return xy_entr_futur
def reg_lin(x, y, x_full,y_full):
def objective(x, a, b):
return a * x + b
params = curve_fit(objective, x, y)
a, b = params[0]
y_test = []
for pt in x_full:
y_test.append(objective(pt, -1*a, b)+36)
y_test = np.array(y_test)
plt.scatter(y_test, x_full, label = "Estimation lin")
plt.scatter(y_full, x_full, label="Real")
plt.legend()
plt.show()
erreur = []
for i in range(len(y_full)):
e = (y_full[i] - y_test[i]) ** 2
erreur.append(np.sqrt(e))
xy_entr_futur = []
for i in range(len(y_full)):
if x_full[i]>position_TC(jour,0,path)[0]:
pos1 = x_full[i]
pos2 = objective(pos1,-1*a,b)+36
xy_entr_futur.append((pos1,pos2))
print(f"Erreur lin totale:{np.sum(erreur)}")
return xy_entr_futur
xy_entr_poly = reg_poly(x, y, x_full, y_full)
xy_entr_lin = reg_lin(x,y,x_full,y_full)
#Soit pos_current la position à l'instant où on arrête les données d'entrainement
#Et à partir duquel on souhaite faire les prédictions
pos_current = position_TC(jour,0,path) #(Lat,Lon)
# ENTRAINEMENT POLY2
temp = []
for i in range(len(xy_entr_poly)):
temp.append(pos_current)
distance_tous_poly2=[]
for i in range(len(xy_entr_poly)):
distance_tous_poly2.append(math.dist(xy_entr_poly[i],temp[i]))
distance_min_coord_poly2 = distance_tous_poly2.index(min(distance_tous_poly2))
print('Poly2:')
print(f"Pt de distance min coord: {distance_min_coord_poly2}")
distance_min = xy_entr_poly[distance_min_coord_poly2]
print(f"Pt de distance min: {distance_min}")
# ENTRAINEMENT LIN
temp = []
for i in range(len(xy_entr_lin)):
temp.append(pos_current)
distance_tous_lin = []
for i in range(len(xy_entr_lin)):
distance_tous_lin.append(math.dist(xy_entr_lin[i], temp[i]))
distance_min_coord_lin = distance_tous_lin.index(min(distance_tous_lin))
print('Lin:')
print(f"Pt de distance min coord: {distance_min_coord_lin}")
distance_min = xy_entr_lin[distance_min_coord_lin]
print(f"Pt de distance min: {distance_min}")
def plot_array(array, lon_plot, lat_plot):
map_plot = Basemap(width=5000000, height=3500000,
resolution='l', projection='cyl',
llcrnrlon=lon_plot.min(), llcrnrlat=lat_plot.min(), urcrnrlon=lon_plot.max(),
urcrnrlat=lat_plot.max(), lat_0=lat_plot.mean(),
lon_0=lon_plot.mean())
lons, lats = np.meshgrid(lon_plot, lat_plot)
xi, yi = map_plot(lons, lats)
map_plot.drawmapboundary(fill_color='aqua')
map_plot.fillcontinents(color='coral', lake_color='aqua')
map_plot.drawcoastlines()
cs = map_plot.pcolor(xi, yi, array)
cbar = map_plot.colorbar(cs, location='bottom', pad="10%")
def met_to_deg(x):
return x / 111000
def deg_to_met(x):
return x * 111000
def cart_to_polar(x0, y0, x1, y1):
# Convertit des coordonnées cartésiennes (x,y) en polaires (r,theta), dans le repère (o,x0,y0)
# Cette fonction fonctionne aussi avec des matrices
x0 = met_to_deg(x0)
y0 = met_to_deg(y0)
x1 = met_to_deg(x1)
y1 = met_to_deg(y1)
x_ref = x1 - x0
y_ref = y1 - y0
x_ref, y_ref = np.meshgrid(x_ref, y_ref)
r = np.sqrt(deg_to_met(x_ref) ** 2 + deg_to_met(y_ref) ** 2)
angle = np.arctan2(x_ref, y_ref)
return r, angle
def polar_to_cart(r,angle):
angle = np.radians(angle)
x = r*np.cos(angle)
y = r*np.sin(angle)
return x,y
def angle(jour,heure,data_path):
# print(position_TC(jour, heure-12, data_path))
# print(position_TC(jour, heure, data_path))
nc = NetCDFFile(data_path)
date_exact = (jour - 1) * 24 + heure
u = np.array(nc.variables['u'][:])
lon = np.array(nc.variables['longitude'][:])
lat = np.array(nc.variables['latitude'][:])
y0, x0 = position_TC(jour, heure-6, data_path)
y1, x1 = position_TC(jour, heure, data_path)
y0 = deg_to_met(y0)
x0 = deg_to_met(x0)
y1 = deg_to_met(y1)
x1 = deg_to_met(x1)
r, angle = cart_to_polar(x0, y0, x1, y1)
angle = np.degrees(angle[0][0])
# plot_array(u[date_exact,:,:],lon,lat)
# plt.plot(met_to_deg(x0), met_to_deg(y0), ms=5, marker='o', markeredgecolor='red')
# plt.plot(met_to_deg(x1), met_to_deg(y1), ms=8, marker='x', markeredgecolor='green')
# plt.show()
return np.round(angle, decimals = 1)
def entree_SF4(jour,heure,data_path):
return angle(jour,heure,data_path)-angle(jour,heure-6,data_path)
def xy_to_indx_lonlat(x, y, lon_map, lat_map):
# On prend l'index ou (x,y)=(lon,lat), avec une tolérance de 25 km,
# sachant que l'écart entre 2 valeurs de (lon,lat) est d'env 25 km aussi
idx_lon = np.where(np.isclose(lon_map, x, atol=0.5))[-1][-1] # sachant que la résolution est de env 25000 km
idx_lat = np.where(np.isclose(lat_map, y, atol=0.5))[-1][-1]
return idx_lon, idx_lat
def output_fuzz(csq,output_var):
output = np.zeros(1)
first_rule = True
for membi in output_var.terms:
csq_i = '['+str(output_var.terms[membi])+']'
if csq[csq_i] != 0:
x_csq = fuzz.fuzzymath.interp_universe(output_var.universe,output_var.terms[membi].mf , 1)
if first_rule:
output = np.array([[np.max(x_csq),csq[csq_i]]])
first_rule = False
else:
output = np.append(output,[[(np.min(x_csq)),csq[csq_i]]],axis=0)
return output
Editor is loading...