Untitled

 avatar
unknown
plain_text
3 years ago
21 kB
13
Indexable
# -*- coding: utf-8 -*-
"""
Automatically generated by Colaboratory.

# setup
"""

path = '/squats/'

"""# imports"""
import pickle
import pandas as pd
import numpy as np
import pickle
import statsmodels.api as sm
from scipy.signal import find_peaks, peak_prominences

#dtft
from array import *
import cmath as cm

import random
import math
from datetime import datetime

import torch
import torch.nn as nn
import torch.nn.init as init
from torch import optim
import torch.nn.functional as F
from torch.nn.utils.rnn import pack_sequence

from config.settings.base import ML_DATASET

#checking if cuda is avilable
if torch.cuda.is_available():  
    dev = "cuda:0" 
else:  
    dev = "cpu"  
device = torch.device(dev)

import warnings
warnings.filterwarnings("ignore")

"""# functions"""

def left_right_confidence(data):
    left_facing_con = 0
    right_facing_con = 0
    if sum(data.iloc[:10]['x_31']) < sum(data.iloc[:10]['x_29']):
      # facing right
      score = sum( ( sum(data.iloc[:10]['visibility_31'])/10 , sum(data.iloc[:10]['visibility_29'])/10 ) ) / 2
      right_facing_con += score
      #left_facing_con += (1-score)
    else:
      # facing left
      score = sum( ( sum(data.iloc[:10]['visibility_31'])/10 , sum(data.iloc[:10]['visibility_29'])/10 ) ) / 2
      #right_facing_con += (1-score)
      left_facing_con += score
  
    if sum(data.iloc[:10]['x_32']) < sum(data.iloc[:10]['x_30']):
        # facing right
        score = sum( ( sum(data.iloc[:10]['visibility_32'])/10 , sum(data.iloc[:10]['visibility_30'])/10 ) ) / 2
        right_facing_con += score
        #left_facing_con += (1-score)
    else:
      # facing left
      score = sum( ( sum(data.iloc[:10]['visibility_32'])/10 , sum(data.iloc[:10]['visibility_30'])/10 ) ) / 2
      #right_facing_con += (1-score)
      left_facing_con += score
  
    return left_facing_con < right_facing_con

def left_or_right(data):
    '''
    Tell which side the user
      True, Points list: Right
      False, Points list: Left
    '''
    select = ['frame']
    indices = []
    facing = False

    indices = [0,23,24,25,26]

    for index in indices:
      for axis in ('x_','y_','z_'):
        select.append(axis+str(index))

    return facing, select

def read_file(file_name):
    df = pd.read_csv(r''+file_name)
    #df.set_index('frame')
    right_left,select = left_or_right(df)
    return df,right_left,select[1:]

def max_min(df,select):
    mn = 10.0
    mx = -10.0

    for i in select:
      mn = min(mn, min(df[i]))
      mx = max(mx, max(df[i]))
    return mx,mn

def smooth(df,select,norm = True,f = 0):
  
    new_df = pd.DataFrame()
    if f == 0:
      f = min( 0.03, (41 / len(df)) )
    time = [i for i in range(len(df))]
  
    all = select

    for i in all:
      new_df[i] = sm.nonparametric.lowess(df[i].values, time,frac= f,
                                            it=3, delta=0.0, is_sorted=True,
                                            missing='drop', return_sorted=False)
  
    if norm:
      x_max, x_min = max_min(new_df,all[0::3])
      y_max, y_min = max_min(new_df,all[1::3])
      z_max, z_min = max_min(new_df,all[2::3])

      # normalise x
      for i in select[0::3]:
        new_df[i] = (new_df[i] - x_min) / (x_max - x_min)

      #normalise y
      for i in select[1::3]:
        new_df[i] = 1 - ( (new_df[i] - y_min) / (y_max - y_min) )

      #normalise z
      for i in select[2::3]:
        new_df[i] = (new_df[i] - z_min) / (z_max - z_min)
  
    new_df = new_df[select]
    
    return new_df

def rel_matrix(send,ad=1):
    if ad == 0:
        if send:
          snd =[#[1,2,3,4],
               [1,0,0,0],# nose
               [0,0,1,0],# sholder
               [0,1,0,0],# hip
               [0,0,0,1]]# knee
          return torch.FloatTensor(snd).to(device)
        else:
          rec =[#[1,2,3,4],
               [0,0,0,0],# nose
               [1,1,0,0],# sholder
               [0,0,1,1],# hip
               [0,0,0,0]]# knee
          return torch.FloatTensor(rec).to(device)
    elif ad == 1:
        if send:
            snd =[#[1,2,3,4,5],
                   [1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# static top
                   [0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# nose
                   [0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# left hip
                   [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0],# right hip
                   [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0],# left knee
                   [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1]]# right knee
            return torch.FloatTensor(snd).to(device)
        else:
            rec =[#[1,2,3,4,5,6],
                   [0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0],# static top
                   [1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0],# nose
                   [0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0],# left hip
                   [0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0],# right hip
                   [0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1],# left knee
                   [0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0]]# right knee
            return torch.FloatTensor(rec).to(device)
    elif ad == 2:
        if send:
            snd =[#[1,2,3,4,5,6],
                   [1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# static top
                   [0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# nose
                   [0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# left hip
                   [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],# right hip
                   [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0],# left knee
                   [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0],# right knee
                   [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1]]# static bottom
            return torch.FloatTensor(snd).to(device)
        else:
            rec =[#[1,2,3,4,5,6],
                   [0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0],# static top
                   [1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0],# nose
                   [0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0],# left hip
                   [0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0],# right hip
                   [0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0],# left knee
                   [0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1],# right knee
                   [0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0]]# static bottom
            return torch.FloatTensor(rec).to(device)

def to_matrix(arr0,arr1,ad,diff,normal,td):
    lst = []
    i = 0
    if ad > 0:
        if diff and normal:
            if td:
              lst.append([0.5,1.1,0.,0.])
            else:
              lst.append([0.5,1.1,0.,0.,0.,0.])
        elif diff:
            if td:
              lst.append([0.,0.])
            else:
              lst.append([0.,0.,0.])
        else:
            if td:
              lst.append([0.5,1.1])
            else:
              lst.append([0.5,1.1,0.])
    while i < len(arr0):
        d = []
        if normal:
            if td: #2d
              d += arr1[i:i+2]
            else: #3d
              d += arr1[i:i+3]
        if diff:
            if td:
              d += [arr1[i+0]-arr0[i+0], arr1[i+1]-arr0[i+1]]
            else:
              d += [arr1[i+0]-arr0[i+0], arr1[i+1]-arr0[i+1], arr1[i+2]-arr0[i+2]]
        lst.append(d)
        i += 3
    if ad > 1:
        if diff and normal:
            if td:
              lst.append([0.5,-0.1,0.,0.])
            else:
              lst.append([0.5,0.,0.,0.,0.,0.])
        elif diff:
            if td:
              lst.append([0.,0.])
            else:
              lst.append([0.,0.,0.])
        else:
            if td:
              lst.append([0.5,-0.1])
            else:
              lst.append([0.5,-0.1,0.])
    return lst

"""### Tranform"""

def transform(dct,ad,diff,normal,td):
    source = []
    target = []
    for _,item in dct.items():
        lst = smooth(item[0],item[2][:],True)[item[2][:]].values.tolist()
        # lst = item[0][item[2][3:]].values.tolist()
        for i in range(1,len(lst)-1):
            source.append( to_matrix(lst[i-1],lst[i],ad,diff,normal,td) )
            target.append( to_matrix(lst[i],lst[i+1],ad,diff,normal,td) )

    return ( torch.transpose( torch.FloatTensor(source).to(device), 1,2),
             torch.transpose( torch.FloatTensor(target).to(device), 1,2) )

"""# IR

## Network
"""

class Inter_network(nn.Module):
    '''Interaction networks'''
    def __init__(self,rec_rel_mat,
                 send_rel_mat,
                 device,
                 obj_dim,
                 rel_sz,
                 relation_hidden_dim,
                 object_hidden_dim):
        super(Inter_network, self).__init__()
    
        #initalizing parameters
        self.device = device
        self.rec_rel_mat = rec_rel_mat
        #self.rec_rel_mat.requires_grad = True
        self.send_rel_mat = send_rel_mat
        #self.send_rel_mat.requires_grad = True

        self.obj_dim = obj_dim
        self.num_obj = rec_rel_mat.shape[0]
        self.num_rel = rec_rel_mat.shape[1]
        self.rel_sz = rel_sz # relation dimension
        
        self.hidden_size_r = relation_hidden_dim
        self.hidden_size_o = object_hidden_dim
        self.effect = 50

        #self.m1 = torch.tensor([[1.],[1.],[0.],[0.]]).to(self.device)
        #self.m2 = torch.tensor([[0.],[0.],[1.],[1.]]).to(self.device)

    
        # relation
        if self.rel_sz == 1:
            self.ra = torch.rand(self.num_rel, self.rel_sz).to(self.device)
            self.x = torch.rand(self.rel_sz, self.num_obj).to(self.device)
        else:
            self.ra = nn.Parameter(torch.zeros(self.num_rel, self.rel_sz))
            self.x = nn.Parameter(torch.zeros(self.rel_sz, self.num_obj))
        

        self.relational = nn.Sequential(
            nn.Linear(self.obj_dim * 2 + self.rel_sz, self.hidden_size_r),
            nn.ReLU(),
            nn.Linear(self.hidden_size_r, self.hidden_size_r),
            nn.ReLU(),
            nn.Linear(self.hidden_size_r, self.hidden_size_r),
            nn.ReLU(),
            nn.Linear(self.hidden_size_r, self.effect)
        )
        
        self.object = nn.Sequential(
            nn.Linear(self.obj_dim + self.effect + self.rel_sz, self.hidden_size_o),
            nn.ReLU(),
            nn.Linear(self.hidden_size_o, self.obj_dim)
        )
    
    def forward(self, object_mat):
        batch_sz = object_mat.shape[0]
        o_send = torch.transpose(torch.bmm(object_mat,self.send_rel_mat.repeat(batch_sz, 1, 1)),1,2)
        o_rec =  torch.transpose( torch.bmm(object_mat,self.rec_rel_mat.repeat(batch_sz, 1, 1)),1,2)
        #dif = o_send - o_rec
        #slope1 = dif[:,:,1]/dif[:,:,0]
        #slope2 = dif[:,:,3]/dif[:,:,2]
        #dif = dif * dif
        #dist1 = torch.bmm(dif,self.m1.repeat(batch_sz,1,1))
        #dist2 = torch.bmm(dif,self.m2.repeat(batch_sz,1,1))
        
        output = torch.cat((o_send,o_rec,
                            self.ra.repeat(batch_sz, 1, 1) ),dim = 2)
        output = torch.transpose(self.relational(output),1,2)
        output = torch.bmm(output,torch.transpose(self.rec_rel_mat.repeat(batch_sz, 1, 1),1,2))
        output = torch.transpose( output,1,2 )
        object_mat = torch.transpose( object_mat,1,2 )
        output = torch.cat(( output,object_mat,
                             torch.transpose(self.x,0,1).repeat(batch_sz, 1, 1) ),dim = 2)
        return torch.transpose(self.object(output),1,2)

"""## Trainer Main model"""

class main_model(nn.Module):
    def __init__(self,rec_rel_mat,send_snd_mat,obj_dim,rel_sz,relation_hidden_dim,
                 object_hidden_dim,device):
        super(main_model, self).__init__()
        self.rel_sz = rel_sz
        self.device = device
        self.Inter_network = Inter_network(rec_rel_mat,
                                           send_snd_mat,
                                           device,
                                           obj_dim,
                                           rel_sz,
                                           relation_hidden_dim,
                                           object_hidden_dim)

    def train(self,source,target,reload,f_nm,n_epochs,batch_size = 100,lr = 0.001):        
        # initalizing Adam optimizer and NLLLoss(Negitive log liklyhood loss)
        optimizer = optim.Adam(self.parameters(), lr = 0.001)
        criterion = nn.MSELoss()

        # calculate number of batch iterations
        n_batches = int(len(target)/batch_size)

        if reload:
            # if True, reload the model from saved checkpoint and retrain the model from epoch and batch last stoped training 
            optimizer, start_epoch = self.load_ckp(f_nm, optimizer)
            #print("Resuming training from epoch {}".format(start_epoch))
        else:
            # else start training from epoch 0
            start_epoch = 0

        # start time
        stime = datetime.now().timestamp()
        
        train_loss = 0.
        
        for i in range(start_epoch,n_epochs):
          
            #shuffle
            r = torch.randperm(source.shape[0])
            source = source[r[:, None]].squeeze(1)
            target = target[r[:, None]].squeeze(1)

            # initalize batch_loss to 0.
            batch_loss = 0.

            for b in range(n_batches):   
                source_batch = source[b*batch_size: (b*batch_size) + batch_size]
                target_batch = target[b*batch_size: (b*batch_size) + batch_size]
        
                # initialize outputs tensor to save output from the training
                outputs = torch.zeros(batch_size,target_batch.shape[1], target_batch.shape[2], requires_grad=True).to(self.device)
                
                # zero the gradient
                optimizer.zero_grad()

                outputs = self.Inter_network.forward(source_batch)
                
                loss = criterion(outputs,target_batch)
                #print("single loss",loss.item())
                batch_loss += loss.item()
                
                # backpropagation
                loss.backward()
                optimizer.step()

            # loss for epoch 
            batch_loss = batch_loss/(n_batches)
            #print(target_batch,"out=",outputs)
            #print("batch losss", batch_loss)
        # show progress to training
        self.save_ckp(n_epochs,optimizer,f_nm)
        #train_loss = train_loss / (n_epochs - start_epoch)
        #return train_loss
    def reload(self,filename = 'checkpoint.pt'):
        optimizer = optim.Adam(self.parameters(), lr = 0.001)
        optimizer, start_epoch = self.load_ckp(filename, optimizer)
        print("Reloaded model trained for {} epochs.".format(start_epoch))

    #Thanks to https://medium.com/analytics-vidhya/saving-and-loading-your-model-to-resume-training-in-pytorch-cb687352fa61
    # code have been modified as per the need of this model
    def save_ckp(self,epoch,optimizer,checkpoint_dir):
        #save checkpoint to resume training
        checkpoint = {
            'epoch': epoch,
            'Inter_network_state_dict': self.Inter_network.state_dict(),
            'optimizer': optimizer.state_dict()
        }
        torch.save(checkpoint, checkpoint_dir)

    def load_ckp(self,checkpoint_fpath, optimizer):
        # reloads the model from the checkpoint
        checkpoint = torch.load(checkpoint_fpath,map_location=self.device)
        self.Inter_network.load_state_dict(checkpoint['Inter_network_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer'])
        return ( optimizer, checkpoint['epoch'])

    def predict(self,source,target= None):
        if source.dim() == 2:
            source = source.unsqueeze(0)
        output =  self.Inter_network.forward(source)
        #output = torch.transpose(output,1,2)

        loss = nn.MSELoss()
        if target != None:
            print('MSE loss over the value {} '.format(loss(output,target)))
        return output.cpu()
        #return loss(output,target)
"""# model
"""


"""### Func"""

def plt_dtft_helper(f):
    # https://gist.github.com/TheRealMentor/018aab68dc4bb55bb8d9a390f657bd1d

    #Defining DTFT function
    def dtft(f,pt):
        output = [0]*n
        for k in range(n):  
            s = 0
            p = 0
            for t in range(len(f)): 
                s += f[t] * cm.exp(-1j * pt[k] * t)
            output[k] = s
        return output

    #Calculating the magnitude of DTFT
    def magnitude(inp,n):
        output = [0]*n
        for t in range(0,n):
            tmp=inp[t]
            output[t]= math.sqrt(tmp.real**2 + tmp.imag**2)
        return output

    #Calculating the phase 
    def phase(inp,n):
        output = [0]*n
        for t in range(0,n):
            tmp=inp[t]
            output[t]= math.atan2(tmp.imag,tmp.real)
        return output

    n = 11
    #Defining the x-limits
    N = 2*((math.pi)/n)
    x = np.arange(-(math.pi),math.pi,N)
    x1 = np.fft.fftshift(x)
    x1 = x1[:n]

    #Using the function that I made
    made_func = dtft(f,x)
    made_func_shift=np.fft.fftshift(made_func)
    made_func_shift_mag = magnitude(made_func_shift,n)
    made_func_shift_phs = phase (made_func_shift,n)

    #Using the inbuilt function
    #inbuilt = np.fft.fft(f,n)
    #inbuilt_mag = magnitude(inbuilt,n)
    #inbuilt_phs = phase (inbuilt,n)

    return x1, made_func_shift_mag, made_func_shift_phs

  
def get_dtft(mse_n,mse_lh,mse_rh,mse_lk,mse_rk,mse_a,label):
    data = []
    def write(lst):
      _, mag, phs = plt_dtft_helper(lst)
      for i in mag:
        data.append(i)
      for i in phs:
        data.append(i)

    write(mse_n)
    write(mse_lh)
    write(mse_rh)
    write(mse_lk)
    write(mse_rk)
    write(mse_a)
  
    if label != None:
      data.append(label)
    
    return data

def predict(model,scaler_model,classifier_model,source,target):
    def mse(true,pred):
      true = torch.transpose(true,0,1)[1:6,:2]
      pred = torch.transpose(pred,0,1)[1:6,:2]
      t = pow((true - pred),2)
      t = t[:,0] + t[:,1]
      t = t.cpu().clone().detach()
      tt = sum(t.tolist())/(t.shape[0] * 2)
      t = (t/2).tolist()
      mse_n.append(t[0])
      mse_lh.append(t[1])
      mse_rh.append(t[2])
      mse_lk.append(t[3])
      mse_rk.append(t[4])
      mse_a.append(tt)

    pre_val = []
    mse_n = [] # nose
    mse_lh = [] # left hip
    mse_rh = [] # right hip
    mse_lk = [] # left knee
    mse_rk = [] # right knee
    mse_a = [] # all

    inpt = source[0]

    for k in range(len(source)):
      temp = model.predict(inpt)
      inpt = temp.to(device)
      mse(target[k],temp[0].to(device).clone().detach())
      pre_val.append(temp[0].tolist())
    prediction = []
    error_sign = [get_dtft(mse_n,mse_lh,mse_rh,mse_lk,mse_rk,mse_a,None)]
    output = scaler_model.transform(error_sign)
    prediction.append(classifier_model.predict(output)[0])
    return prediction

def load_label_mapping():
    labels_map = []
    file = open(ML_DATASET+path+'label_mapping.csv','r')
    for line in file.read().strip().split('\n'):
        line = line.strip().split(',')
        labels_map.append(line[0])
    file.close()
    return labels_map
  
class pipeline:
    def __init__(self):
      self.rel_rec_mat = rel_matrix(False,1)
      self.rel_snd_mat = rel_matrix(True,1)
    
      self.label_mapping = load_label_mapping()
    
      self.in_model = main_model(self.rel_rec_mat,
                        self.rel_snd_mat,
                        4,
                        20,
                        150,
                        150,
                        device).to(device)
      self.in_model.reload(ML_DATASET+path+"models/in_model.pt")
      self.scaler_model = pickle.load(open(ML_DATASET+path+'models/scaler_model.sav', 'rb'))
      self.classifier_model = pickle.load(open(ML_DATASET+path+'models/classifier_model.sav', 'rb'))

    def predict(self,df):
      right_left,select = left_or_right(df)
      dct = {}
      dct['rep'] = [df,right_left,select[1:]]
      source,target = transform(dct,1,True,True,True)
      labels = []
      prediction = predict(self.in_model,self.scaler_model,self.classifier_model,source,target)
      for i in prediction:
        labels.append(self.label_mapping[i])
      return labels
Editor is loading...