Untitled
unknown
plain_text
3 years ago
21 kB
15
Indexable
# -*- coding: utf-8 -*-
"""
Automatically generated by Colaboratory.
# setup
"""
path = '/squats/'
"""# imports"""
import pickle
import pandas as pd
import numpy as np
import pickle
import statsmodels.api as sm
from scipy.signal import find_peaks, peak_prominences
#dtft
from array import *
import cmath as cm
import random
import math
from datetime import datetime
import torch
import torch.nn as nn
import torch.nn.init as init
from torch import optim
import torch.nn.functional as F
from torch.nn.utils.rnn import pack_sequence
from config.settings.base import ML_DATASET
#checking if cuda is avilable
if torch.cuda.is_available():
dev = "cuda:0"
else:
dev = "cpu"
device = torch.device(dev)
import warnings
warnings.filterwarnings("ignore")
"""# functions"""
def left_right_confidence(data):
left_facing_con = 0
right_facing_con = 0
if sum(data.iloc[:10]['x_31']) < sum(data.iloc[:10]['x_29']):
# facing right
score = sum( ( sum(data.iloc[:10]['visibility_31'])/10 , sum(data.iloc[:10]['visibility_29'])/10 ) ) / 2
right_facing_con += score
#left_facing_con += (1-score)
else:
# facing left
score = sum( ( sum(data.iloc[:10]['visibility_31'])/10 , sum(data.iloc[:10]['visibility_29'])/10 ) ) / 2
#right_facing_con += (1-score)
left_facing_con += score
if sum(data.iloc[:10]['x_32']) < sum(data.iloc[:10]['x_30']):
# facing right
score = sum( ( sum(data.iloc[:10]['visibility_32'])/10 , sum(data.iloc[:10]['visibility_30'])/10 ) ) / 2
right_facing_con += score
#left_facing_con += (1-score)
else:
# facing left
score = sum( ( sum(data.iloc[:10]['visibility_32'])/10 , sum(data.iloc[:10]['visibility_30'])/10 ) ) / 2
#right_facing_con += (1-score)
left_facing_con += score
return left_facing_con < right_facing_con
def left_or_right(data):
'''
Tell which side the user
True, Points list: Right
False, Points list: Left
'''
select = ['frame']
indices = []
facing = False
indices = [0,23,24,25,26]
for index in indices:
for axis in ('x_','y_','z_'):
select.append(axis+str(index))
return facing, select
def read_file(file_name):
df = pd.read_csv(r''+file_name)
#df.set_index('frame')
right_left,select = left_or_right(df)
return df,right_left,select[1:]
def max_min(df,select):
mn = 10.0
mx = -10.0
for i in select:
mn = min(mn, min(df[i]))
mx = max(mx, max(df[i]))
return mx,mn
def smooth(df,select,norm = True,f = 0):
new_df = pd.DataFrame()
if f == 0:
f = min( 0.03, (41 / len(df)) )
time = [i for i in range(len(df))]
all = select
for i in all:
new_df[i] = sm.nonparametric.lowess(df[i].values, time,frac= f,
it=3, delta=0.0, is_sorted=True,
missing='drop', return_sorted=False)
if norm:
x_max, x_min = max_min(new_df,all[0::3])
y_max, y_min = max_min(new_df,all[1::3])
z_max, z_min = max_min(new_df,all[2::3])
# normalise x
for i in select[0::3]:
new_df[i] = (new_df[i] - x_min) / (x_max - x_min)
#normalise y
for i in select[1::3]:
new_df[i] = 1 - ( (new_df[i] - y_min) / (y_max - y_min) )
#normalise z
for i in select[2::3]:
new_df[i] = (new_df[i] - z_min) / (z_max - z_min)
new_df = new_df[select]
return new_df
def rel_matrix(send,ad=1):
if ad == 0:
if send:
snd =[#[1,2,3,4],
[1,0,0,0],# nose
[0,0,1,0],# sholder
[0,1,0,0],# hip
[0,0,0,1]]# knee
return torch.FloatTensor(snd).to(device)
else:
rec =[#[1,2,3,4],
[0,0,0,0],# nose
[1,1,0,0],# sholder
[0,0,1,1],# hip
[0,0,0,0]]# knee
return torch.FloatTensor(rec).to(device)
elif ad == 1:
if send:
snd =[#[1,2,3,4,5],
[1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# static top
[0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# nose
[0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# left hip
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0],# right hip
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0],# left knee
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1]]# right knee
return torch.FloatTensor(snd).to(device)
else:
rec =[#[1,2,3,4,5,6],
[0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0],# static top
[1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0],# nose
[0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0],# left hip
[0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0],# right hip
[0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1],# left knee
[0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0]]# right knee
return torch.FloatTensor(rec).to(device)
elif ad == 2:
if send:
snd =[#[1,2,3,4,5,6],
[1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# static top
[0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# nose
[0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# left hip
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# right hip
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0],# left knee
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0],# right knee
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1]]# static bottom
return torch.FloatTensor(snd).to(device)
else:
rec =[#[1,2,3,4,5,6],
[0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0],# static top
[1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0],# nose
[0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0],# left hip
[0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0],# right hip
[0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0],# left knee
[0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1],# right knee
[0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0]]# static bottom
return torch.FloatTensor(rec).to(device)
def to_matrix(arr0,arr1,ad,diff,normal,td):
lst = []
i = 0
if ad > 0:
if diff and normal:
if td:
lst.append([0.5,1.1,0.,0.])
else:
lst.append([0.5,1.1,0.,0.,0.,0.])
elif diff:
if td:
lst.append([0.,0.])
else:
lst.append([0.,0.,0.])
else:
if td:
lst.append([0.5,1.1])
else:
lst.append([0.5,1.1,0.])
while i < len(arr0):
d = []
if normal:
if td: #2d
d += arr1[i:i+2]
else: #3d
d += arr1[i:i+3]
if diff:
if td:
d += [arr1[i+0]-arr0[i+0], arr1[i+1]-arr0[i+1]]
else:
d += [arr1[i+0]-arr0[i+0], arr1[i+1]-arr0[i+1], arr1[i+2]-arr0[i+2]]
lst.append(d)
i += 3
if ad > 1:
if diff and normal:
if td:
lst.append([0.5,-0.1,0.,0.])
else:
lst.append([0.5,0.,0.,0.,0.,0.])
elif diff:
if td:
lst.append([0.,0.])
else:
lst.append([0.,0.,0.])
else:
if td:
lst.append([0.5,-0.1])
else:
lst.append([0.5,-0.1,0.])
return lst
"""### Tranform"""
def transform(dct,ad,diff,normal,td):
source = []
target = []
for _,item in dct.items():
lst = smooth(item[0],item[2][:],True)[item[2][:]].values.tolist()
# lst = item[0][item[2][3:]].values.tolist()
for i in range(1,len(lst)-1):
source.append( to_matrix(lst[i-1],lst[i],ad,diff,normal,td) )
target.append( to_matrix(lst[i],lst[i+1],ad,diff,normal,td) )
return ( torch.transpose( torch.FloatTensor(source).to(device), 1,2),
torch.transpose( torch.FloatTensor(target).to(device), 1,2) )
"""# IR
## Network
"""
class Inter_network(nn.Module):
'''Interaction networks'''
def __init__(self,rec_rel_mat,
send_rel_mat,
device,
obj_dim,
rel_sz,
relation_hidden_dim,
object_hidden_dim):
super(Inter_network, self).__init__()
#initalizing parameters
self.device = device
self.rec_rel_mat = rec_rel_mat
#self.rec_rel_mat.requires_grad = True
self.send_rel_mat = send_rel_mat
#self.send_rel_mat.requires_grad = True
self.obj_dim = obj_dim
self.num_obj = rec_rel_mat.shape[0]
self.num_rel = rec_rel_mat.shape[1]
self.rel_sz = rel_sz # relation dimension
self.hidden_size_r = relation_hidden_dim
self.hidden_size_o = object_hidden_dim
self.effect = 50
#self.m1 = torch.tensor([[1.],[1.],[0.],[0.]]).to(self.device)
#self.m2 = torch.tensor([[0.],[0.],[1.],[1.]]).to(self.device)
# relation
if self.rel_sz == 1:
self.ra = torch.rand(self.num_rel, self.rel_sz).to(self.device)
self.x = torch.rand(self.rel_sz, self.num_obj).to(self.device)
else:
self.ra = nn.Parameter(torch.zeros(self.num_rel, self.rel_sz))
self.x = nn.Parameter(torch.zeros(self.rel_sz, self.num_obj))
self.relational = nn.Sequential(
nn.Linear(self.obj_dim * 2 + self.rel_sz, self.hidden_size_r),
nn.ReLU(),
nn.Linear(self.hidden_size_r, self.hidden_size_r),
nn.ReLU(),
nn.Linear(self.hidden_size_r, self.hidden_size_r),
nn.ReLU(),
nn.Linear(self.hidden_size_r, self.effect)
)
self.object = nn.Sequential(
nn.Linear(self.obj_dim + self.effect + self.rel_sz, self.hidden_size_o),
nn.ReLU(),
nn.Linear(self.hidden_size_o, self.obj_dim)
)
def forward(self, object_mat):
batch_sz = object_mat.shape[0]
o_send = torch.transpose(torch.bmm(object_mat,self.send_rel_mat.repeat(batch_sz, 1, 1)),1,2)
o_rec = torch.transpose( torch.bmm(object_mat,self.rec_rel_mat.repeat(batch_sz, 1, 1)),1,2)
#dif = o_send - o_rec
#slope1 = dif[:,:,1]/dif[:,:,0]
#slope2 = dif[:,:,3]/dif[:,:,2]
#dif = dif * dif
#dist1 = torch.bmm(dif,self.m1.repeat(batch_sz,1,1))
#dist2 = torch.bmm(dif,self.m2.repeat(batch_sz,1,1))
output = torch.cat((o_send,o_rec,
self.ra.repeat(batch_sz, 1, 1) ),dim = 2)
output = torch.transpose(self.relational(output),1,2)
output = torch.bmm(output,torch.transpose(self.rec_rel_mat.repeat(batch_sz, 1, 1),1,2))
output = torch.transpose( output,1,2 )
object_mat = torch.transpose( object_mat,1,2 )
output = torch.cat(( output,object_mat,
torch.transpose(self.x,0,1).repeat(batch_sz, 1, 1) ),dim = 2)
return torch.transpose(self.object(output),1,2)
"""## Trainer Main model"""
class main_model(nn.Module):
def __init__(self,rec_rel_mat,send_snd_mat,obj_dim,rel_sz,relation_hidden_dim,
object_hidden_dim,device):
super(main_model, self).__init__()
self.rel_sz = rel_sz
self.device = device
self.Inter_network = Inter_network(rec_rel_mat,
send_snd_mat,
device,
obj_dim,
rel_sz,
relation_hidden_dim,
object_hidden_dim)
def train(self,source,target,reload,f_nm,n_epochs,batch_size = 100,lr = 0.001):
# initalizing Adam optimizer and NLLLoss(Negitive log liklyhood loss)
optimizer = optim.Adam(self.parameters(), lr = 0.001)
criterion = nn.MSELoss()
# calculate number of batch iterations
n_batches = int(len(target)/batch_size)
if reload:
# if True, reload the model from saved checkpoint and retrain the model from epoch and batch last stoped training
optimizer, start_epoch = self.load_ckp(f_nm, optimizer)
#print("Resuming training from epoch {}".format(start_epoch))
else:
# else start training from epoch 0
start_epoch = 0
# start time
stime = datetime.now().timestamp()
train_loss = 0.
for i in range(start_epoch,n_epochs):
#shuffle
r = torch.randperm(source.shape[0])
source = source[r[:, None]].squeeze(1)
target = target[r[:, None]].squeeze(1)
# initalize batch_loss to 0.
batch_loss = 0.
for b in range(n_batches):
source_batch = source[b*batch_size: (b*batch_size) + batch_size]
target_batch = target[b*batch_size: (b*batch_size) + batch_size]
# initialize outputs tensor to save output from the training
outputs = torch.zeros(batch_size,target_batch.shape[1], target_batch.shape[2], requires_grad=True).to(self.device)
# zero the gradient
optimizer.zero_grad()
outputs = self.Inter_network.forward(source_batch)
loss = criterion(outputs,target_batch)
#print("single loss",loss.item())
batch_loss += loss.item()
# backpropagation
loss.backward()
optimizer.step()
# loss for epoch
batch_loss = batch_loss/(n_batches)
#print(target_batch,"out=",outputs)
#print("batch losss", batch_loss)
# show progress to training
self.save_ckp(n_epochs,optimizer,f_nm)
#train_loss = train_loss / (n_epochs - start_epoch)
#return train_loss
def reload(self,filename = 'checkpoint.pt'):
optimizer = optim.Adam(self.parameters(), lr = 0.001)
optimizer, start_epoch = self.load_ckp(filename, optimizer)
print("Reloaded model trained for {} epochs.".format(start_epoch))
#Thanks to https://medium.com/analytics-vidhya/saving-and-loading-your-model-to-resume-training-in-pytorch-cb687352fa61
# code have been modified as per the need of this model
def save_ckp(self,epoch,optimizer,checkpoint_dir):
#save checkpoint to resume training
checkpoint = {
'epoch': epoch,
'Inter_network_state_dict': self.Inter_network.state_dict(),
'optimizer': optimizer.state_dict()
}
torch.save(checkpoint, checkpoint_dir)
def load_ckp(self,checkpoint_fpath, optimizer):
# reloads the model from the checkpoint
checkpoint = torch.load(checkpoint_fpath,map_location=self.device)
self.Inter_network.load_state_dict(checkpoint['Inter_network_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer'])
return ( optimizer, checkpoint['epoch'])
def predict(self,source,target= None):
if source.dim() == 2:
source = source.unsqueeze(0)
output = self.Inter_network.forward(source)
#output = torch.transpose(output,1,2)
loss = nn.MSELoss()
if target != None:
print('MSE loss over the value {} '.format(loss(output,target)))
return output.cpu()
#return loss(output,target)
"""# model
"""
"""### Func"""
def plt_dtft_helper(f):
# https://gist.github.com/TheRealMentor/018aab68dc4bb55bb8d9a390f657bd1d
#Defining DTFT function
def dtft(f,pt):
output = [0]*n
for k in range(n):
s = 0
p = 0
for t in range(len(f)):
s += f[t] * cm.exp(-1j * pt[k] * t)
output[k] = s
return output
#Calculating the magnitude of DTFT
def magnitude(inp,n):
output = [0]*n
for t in range(0,n):
tmp=inp[t]
output[t]= math.sqrt(tmp.real**2 + tmp.imag**2)
return output
#Calculating the phase
def phase(inp,n):
output = [0]*n
for t in range(0,n):
tmp=inp[t]
output[t]= math.atan2(tmp.imag,tmp.real)
return output
n = 11
#Defining the x-limits
N = 2*((math.pi)/n)
x = np.arange(-(math.pi),math.pi,N)
x1 = np.fft.fftshift(x)
x1 = x1[:n]
#Using the function that I made
made_func = dtft(f,x)
made_func_shift=np.fft.fftshift(made_func)
made_func_shift_mag = magnitude(made_func_shift,n)
made_func_shift_phs = phase (made_func_shift,n)
#Using the inbuilt function
#inbuilt = np.fft.fft(f,n)
#inbuilt_mag = magnitude(inbuilt,n)
#inbuilt_phs = phase (inbuilt,n)
return x1, made_func_shift_mag, made_func_shift_phs
def get_dtft(mse_n,mse_lh,mse_rh,mse_lk,mse_rk,mse_a,label):
data = []
def write(lst):
_, mag, phs = plt_dtft_helper(lst)
for i in mag:
data.append(i)
for i in phs:
data.append(i)
write(mse_n)
write(mse_lh)
write(mse_rh)
write(mse_lk)
write(mse_rk)
write(mse_a)
if label != None:
data.append(label)
return data
def predict(model,scaler_model,classifier_model,source,target):
def mse(true,pred):
true = torch.transpose(true,0,1)[1:6,:2]
pred = torch.transpose(pred,0,1)[1:6,:2]
t = pow((true - pred),2)
t = t[:,0] + t[:,1]
t = t.cpu().clone().detach()
tt = sum(t.tolist())/(t.shape[0] * 2)
t = (t/2).tolist()
mse_n.append(t[0])
mse_lh.append(t[1])
mse_rh.append(t[2])
mse_lk.append(t[3])
mse_rk.append(t[4])
mse_a.append(tt)
pre_val = []
mse_n = [] # nose
mse_lh = [] # left hip
mse_rh = [] # right hip
mse_lk = [] # left knee
mse_rk = [] # right knee
mse_a = [] # all
inpt = source[0]
for k in range(len(source)):
temp = model.predict(inpt)
inpt = temp.to(device)
mse(target[k],temp[0].to(device).clone().detach())
pre_val.append(temp[0].tolist())
prediction = []
error_sign = [get_dtft(mse_n,mse_lh,mse_rh,mse_lk,mse_rk,mse_a,None)]
output = scaler_model.transform(error_sign)
prediction.append(classifier_model.predict(output)[0])
return prediction
def load_label_mapping():
labels_map = []
file = open(ML_DATASET+path+'label_mapping.csv','r')
for line in file.read().strip().split('\n'):
line = line.strip().split(',')
labels_map.append(line[0])
file.close()
return labels_map
class pipeline:
def __init__(self):
self.rel_rec_mat = rel_matrix(False,1)
self.rel_snd_mat = rel_matrix(True,1)
self.label_mapping = load_label_mapping()
self.in_model = main_model(self.rel_rec_mat,
self.rel_snd_mat,
4,
20,
150,
150,
device).to(device)
self.in_model.reload(ML_DATASET+path+"models/in_model.pt")
self.scaler_model = pickle.load(open(ML_DATASET+path+'models/scaler_model.sav', 'rb'))
self.classifier_model = pickle.load(open(ML_DATASET+path+'models/classifier_model.sav', 'rb'))
def predict(self,df):
right_left,select = left_or_right(df)
dct = {}
dct['rep'] = [df,right_left,select[1:]]
source,target = transform(dct,1,True,True,True)
labels = []
prediction = predict(self.in_model,self.scaler_model,self.classifier_model,source,target)
for i in prediction:
labels.append(self.label_mapping[i])
return labelsEditor is loading...