Untitled
unknown
plain_text
3 years ago
21 kB
13
Indexable
# -*- coding: utf-8 -*- """ Automatically generated by Colaboratory. # setup """ path = '/squats/' """# imports""" import pickle import pandas as pd import numpy as np import pickle import statsmodels.api as sm from scipy.signal import find_peaks, peak_prominences #dtft from array import * import cmath as cm import random import math from datetime import datetime import torch import torch.nn as nn import torch.nn.init as init from torch import optim import torch.nn.functional as F from torch.nn.utils.rnn import pack_sequence from config.settings.base import ML_DATASET #checking if cuda is avilable if torch.cuda.is_available(): dev = "cuda:0" else: dev = "cpu" device = torch.device(dev) import warnings warnings.filterwarnings("ignore") """# functions""" def left_right_confidence(data): left_facing_con = 0 right_facing_con = 0 if sum(data.iloc[:10]['x_31']) < sum(data.iloc[:10]['x_29']): # facing right score = sum( ( sum(data.iloc[:10]['visibility_31'])/10 , sum(data.iloc[:10]['visibility_29'])/10 ) ) / 2 right_facing_con += score #left_facing_con += (1-score) else: # facing left score = sum( ( sum(data.iloc[:10]['visibility_31'])/10 , sum(data.iloc[:10]['visibility_29'])/10 ) ) / 2 #right_facing_con += (1-score) left_facing_con += score if sum(data.iloc[:10]['x_32']) < sum(data.iloc[:10]['x_30']): # facing right score = sum( ( sum(data.iloc[:10]['visibility_32'])/10 , sum(data.iloc[:10]['visibility_30'])/10 ) ) / 2 right_facing_con += score #left_facing_con += (1-score) else: # facing left score = sum( ( sum(data.iloc[:10]['visibility_32'])/10 , sum(data.iloc[:10]['visibility_30'])/10 ) ) / 2 #right_facing_con += (1-score) left_facing_con += score return left_facing_con < right_facing_con def left_or_right(data): ''' Tell which side the user True, Points list: Right False, Points list: Left ''' select = ['frame'] indices = [] facing = False indices = [0,23,24,25,26] for index in indices: for axis in ('x_','y_','z_'): select.append(axis+str(index)) return facing, select def read_file(file_name): df = pd.read_csv(r''+file_name) #df.set_index('frame') right_left,select = left_or_right(df) return df,right_left,select[1:] def max_min(df,select): mn = 10.0 mx = -10.0 for i in select: mn = min(mn, min(df[i])) mx = max(mx, max(df[i])) return mx,mn def smooth(df,select,norm = True,f = 0): new_df = pd.DataFrame() if f == 0: f = min( 0.03, (41 / len(df)) ) time = [i for i in range(len(df))] all = select for i in all: new_df[i] = sm.nonparametric.lowess(df[i].values, time,frac= f, it=3, delta=0.0, is_sorted=True, missing='drop', return_sorted=False) if norm: x_max, x_min = max_min(new_df,all[0::3]) y_max, y_min = max_min(new_df,all[1::3]) z_max, z_min = max_min(new_df,all[2::3]) # normalise x for i in select[0::3]: new_df[i] = (new_df[i] - x_min) / (x_max - x_min) #normalise y for i in select[1::3]: new_df[i] = 1 - ( (new_df[i] - y_min) / (y_max - y_min) ) #normalise z for i in select[2::3]: new_df[i] = (new_df[i] - z_min) / (z_max - z_min) new_df = new_df[select] return new_df def rel_matrix(send,ad=1): if ad == 0: if send: snd =[#[1,2,3,4], [1,0,0,0],# nose [0,0,1,0],# sholder [0,1,0,0],# hip [0,0,0,1]]# knee return torch.FloatTensor(snd).to(device) else: rec =[#[1,2,3,4], [0,0,0,0],# nose [1,1,0,0],# sholder [0,0,1,1],# hip [0,0,0,0]]# knee return torch.FloatTensor(rec).to(device) elif ad == 1: if send: snd =[#[1,2,3,4,5], [1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# static top [0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# nose [0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# left hip [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0],# right hip [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0],# left knee [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1]]# right knee return torch.FloatTensor(snd).to(device) else: rec =[#[1,2,3,4,5,6], [0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0],# static top [1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0],# nose [0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0],# left hip [0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0],# right hip [0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1],# left knee [0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0]]# right knee return torch.FloatTensor(rec).to(device) elif ad == 2: if send: snd =[#[1,2,3,4,5,6], [1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# static top [0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# nose [0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# left hip [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# right hip [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0],# left knee [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0],# right knee [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1]]# static bottom return torch.FloatTensor(snd).to(device) else: rec =[#[1,2,3,4,5,6], [0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0],# static top [1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0],# nose [0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0],# left hip [0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0],# right hip [0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0],# left knee [0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1],# right knee [0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0]]# static bottom return torch.FloatTensor(rec).to(device) def to_matrix(arr0,arr1,ad,diff,normal,td): lst = [] i = 0 if ad > 0: if diff and normal: if td: lst.append([0.5,1.1,0.,0.]) else: lst.append([0.5,1.1,0.,0.,0.,0.]) elif diff: if td: lst.append([0.,0.]) else: lst.append([0.,0.,0.]) else: if td: lst.append([0.5,1.1]) else: lst.append([0.5,1.1,0.]) while i < len(arr0): d = [] if normal: if td: #2d d += arr1[i:i+2] else: #3d d += arr1[i:i+3] if diff: if td: d += [arr1[i+0]-arr0[i+0], arr1[i+1]-arr0[i+1]] else: d += [arr1[i+0]-arr0[i+0], arr1[i+1]-arr0[i+1], arr1[i+2]-arr0[i+2]] lst.append(d) i += 3 if ad > 1: if diff and normal: if td: lst.append([0.5,-0.1,0.,0.]) else: lst.append([0.5,0.,0.,0.,0.,0.]) elif diff: if td: lst.append([0.,0.]) else: lst.append([0.,0.,0.]) else: if td: lst.append([0.5,-0.1]) else: lst.append([0.5,-0.1,0.]) return lst """### Tranform""" def transform(dct,ad,diff,normal,td): source = [] target = [] for _,item in dct.items(): lst = smooth(item[0],item[2][:],True)[item[2][:]].values.tolist() # lst = item[0][item[2][3:]].values.tolist() for i in range(1,len(lst)-1): source.append( to_matrix(lst[i-1],lst[i],ad,diff,normal,td) ) target.append( to_matrix(lst[i],lst[i+1],ad,diff,normal,td) ) return ( torch.transpose( torch.FloatTensor(source).to(device), 1,2), torch.transpose( torch.FloatTensor(target).to(device), 1,2) ) """# IR ## Network """ class Inter_network(nn.Module): '''Interaction networks''' def __init__(self,rec_rel_mat, send_rel_mat, device, obj_dim, rel_sz, relation_hidden_dim, object_hidden_dim): super(Inter_network, self).__init__() #initalizing parameters self.device = device self.rec_rel_mat = rec_rel_mat #self.rec_rel_mat.requires_grad = True self.send_rel_mat = send_rel_mat #self.send_rel_mat.requires_grad = True self.obj_dim = obj_dim self.num_obj = rec_rel_mat.shape[0] self.num_rel = rec_rel_mat.shape[1] self.rel_sz = rel_sz # relation dimension self.hidden_size_r = relation_hidden_dim self.hidden_size_o = object_hidden_dim self.effect = 50 #self.m1 = torch.tensor([[1.],[1.],[0.],[0.]]).to(self.device) #self.m2 = torch.tensor([[0.],[0.],[1.],[1.]]).to(self.device) # relation if self.rel_sz == 1: self.ra = torch.rand(self.num_rel, self.rel_sz).to(self.device) self.x = torch.rand(self.rel_sz, self.num_obj).to(self.device) else: self.ra = nn.Parameter(torch.zeros(self.num_rel, self.rel_sz)) self.x = nn.Parameter(torch.zeros(self.rel_sz, self.num_obj)) self.relational = nn.Sequential( nn.Linear(self.obj_dim * 2 + self.rel_sz, self.hidden_size_r), nn.ReLU(), nn.Linear(self.hidden_size_r, self.hidden_size_r), nn.ReLU(), nn.Linear(self.hidden_size_r, self.hidden_size_r), nn.ReLU(), nn.Linear(self.hidden_size_r, self.effect) ) self.object = nn.Sequential( nn.Linear(self.obj_dim + self.effect + self.rel_sz, self.hidden_size_o), nn.ReLU(), nn.Linear(self.hidden_size_o, self.obj_dim) ) def forward(self, object_mat): batch_sz = object_mat.shape[0] o_send = torch.transpose(torch.bmm(object_mat,self.send_rel_mat.repeat(batch_sz, 1, 1)),1,2) o_rec = torch.transpose( torch.bmm(object_mat,self.rec_rel_mat.repeat(batch_sz, 1, 1)),1,2) #dif = o_send - o_rec #slope1 = dif[:,:,1]/dif[:,:,0] #slope2 = dif[:,:,3]/dif[:,:,2] #dif = dif * dif #dist1 = torch.bmm(dif,self.m1.repeat(batch_sz,1,1)) #dist2 = torch.bmm(dif,self.m2.repeat(batch_sz,1,1)) output = torch.cat((o_send,o_rec, self.ra.repeat(batch_sz, 1, 1) ),dim = 2) output = torch.transpose(self.relational(output),1,2) output = torch.bmm(output,torch.transpose(self.rec_rel_mat.repeat(batch_sz, 1, 1),1,2)) output = torch.transpose( output,1,2 ) object_mat = torch.transpose( object_mat,1,2 ) output = torch.cat(( output,object_mat, torch.transpose(self.x,0,1).repeat(batch_sz, 1, 1) ),dim = 2) return torch.transpose(self.object(output),1,2) """## Trainer Main model""" class main_model(nn.Module): def __init__(self,rec_rel_mat,send_snd_mat,obj_dim,rel_sz,relation_hidden_dim, object_hidden_dim,device): super(main_model, self).__init__() self.rel_sz = rel_sz self.device = device self.Inter_network = Inter_network(rec_rel_mat, send_snd_mat, device, obj_dim, rel_sz, relation_hidden_dim, object_hidden_dim) def train(self,source,target,reload,f_nm,n_epochs,batch_size = 100,lr = 0.001): # initalizing Adam optimizer and NLLLoss(Negitive log liklyhood loss) optimizer = optim.Adam(self.parameters(), lr = 0.001) criterion = nn.MSELoss() # calculate number of batch iterations n_batches = int(len(target)/batch_size) if reload: # if True, reload the model from saved checkpoint and retrain the model from epoch and batch last stoped training optimizer, start_epoch = self.load_ckp(f_nm, optimizer) #print("Resuming training from epoch {}".format(start_epoch)) else: # else start training from epoch 0 start_epoch = 0 # start time stime = datetime.now().timestamp() train_loss = 0. for i in range(start_epoch,n_epochs): #shuffle r = torch.randperm(source.shape[0]) source = source[r[:, None]].squeeze(1) target = target[r[:, None]].squeeze(1) # initalize batch_loss to 0. batch_loss = 0. for b in range(n_batches): source_batch = source[b*batch_size: (b*batch_size) + batch_size] target_batch = target[b*batch_size: (b*batch_size) + batch_size] # initialize outputs tensor to save output from the training outputs = torch.zeros(batch_size,target_batch.shape[1], target_batch.shape[2], requires_grad=True).to(self.device) # zero the gradient optimizer.zero_grad() outputs = self.Inter_network.forward(source_batch) loss = criterion(outputs,target_batch) #print("single loss",loss.item()) batch_loss += loss.item() # backpropagation loss.backward() optimizer.step() # loss for epoch batch_loss = batch_loss/(n_batches) #print(target_batch,"out=",outputs) #print("batch losss", batch_loss) # show progress to training self.save_ckp(n_epochs,optimizer,f_nm) #train_loss = train_loss / (n_epochs - start_epoch) #return train_loss def reload(self,filename = 'checkpoint.pt'): optimizer = optim.Adam(self.parameters(), lr = 0.001) optimizer, start_epoch = self.load_ckp(filename, optimizer) print("Reloaded model trained for {} epochs.".format(start_epoch)) #Thanks to https://medium.com/analytics-vidhya/saving-and-loading-your-model-to-resume-training-in-pytorch-cb687352fa61 # code have been modified as per the need of this model def save_ckp(self,epoch,optimizer,checkpoint_dir): #save checkpoint to resume training checkpoint = { 'epoch': epoch, 'Inter_network_state_dict': self.Inter_network.state_dict(), 'optimizer': optimizer.state_dict() } torch.save(checkpoint, checkpoint_dir) def load_ckp(self,checkpoint_fpath, optimizer): # reloads the model from the checkpoint checkpoint = torch.load(checkpoint_fpath,map_location=self.device) self.Inter_network.load_state_dict(checkpoint['Inter_network_state_dict']) optimizer.load_state_dict(checkpoint['optimizer']) return ( optimizer, checkpoint['epoch']) def predict(self,source,target= None): if source.dim() == 2: source = source.unsqueeze(0) output = self.Inter_network.forward(source) #output = torch.transpose(output,1,2) loss = nn.MSELoss() if target != None: print('MSE loss over the value {} '.format(loss(output,target))) return output.cpu() #return loss(output,target) """# model """ """### Func""" def plt_dtft_helper(f): # https://gist.github.com/TheRealMentor/018aab68dc4bb55bb8d9a390f657bd1d #Defining DTFT function def dtft(f,pt): output = [0]*n for k in range(n): s = 0 p = 0 for t in range(len(f)): s += f[t] * cm.exp(-1j * pt[k] * t) output[k] = s return output #Calculating the magnitude of DTFT def magnitude(inp,n): output = [0]*n for t in range(0,n): tmp=inp[t] output[t]= math.sqrt(tmp.real**2 + tmp.imag**2) return output #Calculating the phase def phase(inp,n): output = [0]*n for t in range(0,n): tmp=inp[t] output[t]= math.atan2(tmp.imag,tmp.real) return output n = 11 #Defining the x-limits N = 2*((math.pi)/n) x = np.arange(-(math.pi),math.pi,N) x1 = np.fft.fftshift(x) x1 = x1[:n] #Using the function that I made made_func = dtft(f,x) made_func_shift=np.fft.fftshift(made_func) made_func_shift_mag = magnitude(made_func_shift,n) made_func_shift_phs = phase (made_func_shift,n) #Using the inbuilt function #inbuilt = np.fft.fft(f,n) #inbuilt_mag = magnitude(inbuilt,n) #inbuilt_phs = phase (inbuilt,n) return x1, made_func_shift_mag, made_func_shift_phs def get_dtft(mse_n,mse_lh,mse_rh,mse_lk,mse_rk,mse_a,label): data = [] def write(lst): _, mag, phs = plt_dtft_helper(lst) for i in mag: data.append(i) for i in phs: data.append(i) write(mse_n) write(mse_lh) write(mse_rh) write(mse_lk) write(mse_rk) write(mse_a) if label != None: data.append(label) return data def predict(model,scaler_model,classifier_model,source,target): def mse(true,pred): true = torch.transpose(true,0,1)[1:6,:2] pred = torch.transpose(pred,0,1)[1:6,:2] t = pow((true - pred),2) t = t[:,0] + t[:,1] t = t.cpu().clone().detach() tt = sum(t.tolist())/(t.shape[0] * 2) t = (t/2).tolist() mse_n.append(t[0]) mse_lh.append(t[1]) mse_rh.append(t[2]) mse_lk.append(t[3]) mse_rk.append(t[4]) mse_a.append(tt) pre_val = [] mse_n = [] # nose mse_lh = [] # left hip mse_rh = [] # right hip mse_lk = [] # left knee mse_rk = [] # right knee mse_a = [] # all inpt = source[0] for k in range(len(source)): temp = model.predict(inpt) inpt = temp.to(device) mse(target[k],temp[0].to(device).clone().detach()) pre_val.append(temp[0].tolist()) prediction = [] error_sign = [get_dtft(mse_n,mse_lh,mse_rh,mse_lk,mse_rk,mse_a,None)] output = scaler_model.transform(error_sign) prediction.append(classifier_model.predict(output)[0]) return prediction def load_label_mapping(): labels_map = [] file = open(ML_DATASET+path+'label_mapping.csv','r') for line in file.read().strip().split('\n'): line = line.strip().split(',') labels_map.append(line[0]) file.close() return labels_map class pipeline: def __init__(self): self.rel_rec_mat = rel_matrix(False,1) self.rel_snd_mat = rel_matrix(True,1) self.label_mapping = load_label_mapping() self.in_model = main_model(self.rel_rec_mat, self.rel_snd_mat, 4, 20, 150, 150, device).to(device) self.in_model.reload(ML_DATASET+path+"models/in_model.pt") self.scaler_model = pickle.load(open(ML_DATASET+path+'models/scaler_model.sav', 'rb')) self.classifier_model = pickle.load(open(ML_DATASET+path+'models/classifier_model.sav', 'rb')) def predict(self,df): right_left,select = left_or_right(df) dct = {} dct['rep'] = [df,right_left,select[1:]] source,target = transform(dct,1,True,True,True) labels = [] prediction = predict(self.in_model,self.scaler_model,self.classifier_model,source,target) for i in prediction: labels.append(self.label_mapping[i]) return labels
Editor is loading...