Untitled

mail@pastecode.io avatar
unknown
plain_text
5 months ago
16 kB
3
Indexable
import tqdm
import random
import pathlib
import itertools
import collections
import os

import pandas as pd
import cv2
import numpy as np
import matplotlib.pyplot as plt

import keras
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy

from official.projects.movinet.modeling import movinet
from official.projects.movinet.modeling import movinet_model

class Augmentations:
    def __init__(self, prob_rotation=0.5, prob_zoom_out=0.5, prob_contrast=0.5, prob_brightness=0.5, prob_perspective=0.5):
        self.max_rotation = 8  
        self.max_zoom_out = 0.7  
        self.contrast_range = (0.8, 1.6)
        self.brightness_range = (0.8, 1.4) 
        self.perspective_transform_range = 0.03 
        
        self.prob_rotation = prob_rotation
        self.prob_zoom_out = prob_zoom_out
        self.prob_contrast = prob_contrast
        self.prob_brightness = prob_brightness
        self.prob_perspective = prob_perspective

        self.params = None

    def initialize_params(self, image):
        params = {}

        if np.random.rand() < self.prob_rotation:
            params['rotation'] = np.random.uniform(-self.max_rotation, self.max_rotation)
        else:
            params['rotation'] = None

        if np.random.rand() < self.prob_zoom_out:
            params['zoom_factor'] = 1 - np.random.uniform(0, self.max_zoom_out)
        else:
            params['zoom_factor'] = None

        if np.random.rand() < self.prob_contrast:
            params['contrast_alpha'] = np.random.uniform(*self.contrast_range)
        else:
            params['contrast_alpha'] = None

        if np.random.rand() < self.prob_brightness:
            params['brightness_beta'] = np.random.uniform(*self.brightness_range)# * 255
        else:
            params['brightness_beta'] = None

        if np.random.rand() < self.prob_perspective:
            h, w = image.shape[:2]
            src_points = np.float32([
                [0, 0],
                [w, 0],
                [0, h],
                [w, h]
            ])
            dst_points = src_points + np.random.uniform(-self.perspective_transform_range*w, self.perspective_transform_range*w, src_points.shape).astype(np.float32)
            params['perspective_matrix'] = cv2.getPerspectiveTransform(src_points, dst_points)
        else:
            params['perspective_matrix'] = None

        self.params = params

    def apply_augmentations(self, image):
        if self.params['rotation'] is not None:
            image = self.apply_rotation(image, self.params['rotation'])
        if self.params['zoom_factor'] is not None:
            image = self.apply_zoom_out(image, self.params['zoom_factor'])
        if self.params['contrast_alpha'] is not None:
            image = self.apply_contrast(image, self.params['contrast_alpha'])
        if self.params['brightness_beta'] is not None:
            image = self.apply_brightness(image, self.params['brightness_beta'])
        if self.params['perspective_matrix'] is not None:
            image = self.apply_perspective_transform(image, self.params['perspective_matrix'])
        return image

    def apply_rotation(self, image, angle):
        h, w = image.shape[:2]
        matrix = cv2.getRotationMatrix2D((w/2, h/2), angle, 1)
        return cv2.warpAffine(image, matrix, (w, h))

    def apply_zoom_out(self, image, zoom_factor):
        h, w = image.shape[:2]
        nh, nw = int(h * zoom_factor), int(w * zoom_factor)
        resized_image = cv2.resize(image, (nw, nh))

        new_image = np.zeros_like(image)
        new_image[(h-nh)//2:(h-nh)//2+nh, (w-nw)//2:(w-nw)//2+nw] = resized_image
        return new_image

    def apply_contrast(self, image, alpha):
        return cv2.convertScaleAbs(image, alpha=alpha, beta=0)

    def apply_brightness(self, image, beta):
        return cv2.convertScaleAbs(image, alpha=1, beta=beta)

    def apply_perspective_transform(self, image, matrix):
        h, w = image.shape[:2]
        return cv2.warpPerspective(image, matrix, (w, h))

    def __call__(self, image, initialize=False):
        if initialize or self.params is None:
            self.initialize_params(image)
        return self.apply_augmentations(image)
class StreamingTrainMovinetClassifier(movinet_model.MovinetClassifier):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.gradient_accumulation = [tf.Variable(tf.zeros_like(v, dtype=tf.float32), trainable=False) for v in self.trainable_variables]
 
    def train_step(self, data):
        x, y, sample_weight = data
        frames = tf.split(x, x.shape[1], axis=1)
#         frames = [x[:, i:i+1,] for i in (tf.range(tf.shape(x)[1]))]
#         frames = tf.unstack(x, axis=1)
#         num_partitions = 50
#         partitions = tf.range(tf.shape(x)[1])
#         frames = tf.dynamic_partition(x, partitions, num_partitions)
#         init_states_fn = self.init_states
#         print(tf.expand_dims(frames[0], axis=1).shape)
        init_states = self.init_states(frames[0].shape)
        states = init_states
#         gradient_accumulation = [tf.Variable(tf.zeros_like(v, dtype=tf.float32), trainable=False) for v in self.trainable_variables]
        total_loss = 0.0
        for frame in frames:
            with tf.GradientTape() as tape:
                logits, states = self({**states, 'image': frame},training=True)
                loss = self.compute_loss(
                    y=y, y_pred=logits, sample_weight=sample_weight
                )
            total_loss += loss / len(frames)

            gradients = tape.gradient(loss, self.trainable_variables)

            for i in range(len(self.gradient_accumulation)):
                self.gradient_accumulation[i].assign_add(gradients[i] / len(frames))
        self.optimizer.apply_gradients(zip(self.gradient_accumulation, self.trainable_variables))
        for i in range(len(self.gradient_accumulation)):
            self.gradient_accumulation[i].assign(tf.zeros_like(self.trainable_variables[i], dtype=tf.float32))

        
        self.compiled_metrics.update_state(y, logits, sample_weight)
        for metric in self.metrics:
            if metric.name == "classifier_head_loss":
                continue
            if metric.name == "loss":
                metric.update_state(total_loss)
            else:
                metric.update_state(y, logits)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}
    
    def test_step(self, data):
        x, y, sample_weight = data
        frames = tf.split(x, x.shape[1], axis=1)
        init_states = self.init_states(frames[0].shape)
        states = init_states
#         gradient_accumulation = [tf.Variable(tf.zeros_like(v, dtype=tf.float32), trainable=False) for v in self.trainable_variables]
        total_loss = 0.0
        for frame in frames:
            logits, states = self({**states, 'image': frame},training=True)
            loss = self.compute_loss(
                y=y, y_pred=logits, sample_weight=sample_weight
            )
        
            
        self.compiled_metrics.update_state(y, logits, sample_weight)
#         print('test metrics: ', self.compiled_metrics.result())
        for metric in self.metrics:
            if metric.name == "classifier_head_loss":
                continue
            if metric.name == "loss":
                metric.update_state(total_loss)
            else:
                metric.update_state(y, logits)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

IMAGE_SIZE = 132
class FrameGenerator:
    JESTER_MAPPING = {
        'Swiping Right': 'swipe_r',
        'Swiping Left': 'swipe_l',
        "Swiping Down": 'swipe_d',
        "Swiping Up": 'swipe_u',
        "Pushing Hand Away": 'other',
        "Pulling Hand In": 'other',
        "Sliding Two Fingers Left": 'shake',
        "Sliding Two Fingers Right": 'shake',
        "Sliding Two Fingers Down": 'swipe_d',
        "Sliding Two Fingers Up": 'swipe_u',
        "Pushing Two Fingers Away": 'other',
        "Pulling Two Fingers In": 'other',
        "Rolling Hand Forward": 'shake',
        "Rolling Hand Backward": 'shake',
        "Turning Hand Clockwise": 'shake',
        "Turning Hand Counterclockwise": 'shake',
        "Zooming In With Full Hand": 'other',
        "Zooming Out With Full Hand": 'other',
        "Zooming In With Two Fingers": 'other',
        "Zooming Out With Two Fingers": 'other',
        "Thumb Up": 'other',
        "Thumb Down": 'other',
        "Shaking Hand": 'shake',
        "Stop Sign": 'stop',
        "Drumming Fingers": 'other',
        "No gesture": 'no_gesture',
        "Doing other things": 'other'
        
    }

    GESTURES_NAMES = ['stop', 'swipe_l', 'swipe_r', 'other', 'no_gesture', 'shake', 'swipe_u', 'swipe_d']
    GESTURES_NAMES_INV = {
        'stop': 0, 'swipe_l': 1, 'swipe_r': 2, 'other': 3, 
        'no_gesture': 4, 'shake': 5, 'swipe_u': 6, 'swipe_d': 7
    }
    def __init__(self, mode, batch_size=32):
        """ Returns a set of frames with their associated label.

        Args:
        path: Video file paths.
        n_frames: Number of frames.
        training: Boolean to determine if training dataset is being created.
        """
        self.mode = mode
        self.augmentations = Augmentations(
            prob_rotation=0.35, 
            prob_zoom_out=0.35, 
            prob_contrast=0.35, 
            prob_brightness=0.35, 
            prob_perspective=0.35
        )
        if mode == 'train': 
            self.dyn_mapping = dict(zip(pd.read_csv('/slot/sandbox/d/in/data/0_data_unpacked/jester-v1-train.csv', header=None, sep=';')[0],
                            pd.read_csv('/slot/sandbox/d/in/data/0_data_unpacked/jester-v1-train.csv', header=None, sep=';')[1]))
        else:
            self.dyn_mapping = dict(zip(pd.read_csv('/slot/sandbox/d/in/data/0_data_unpacked/jester-v1-validation.csv', header=None, sep=';')[0],
                               pd.read_csv('/slot/sandbox/d/in/data/0_data_unpacked/jester-v1-validation.csv', header=None, sep=';')[1]))
        
        self.root_dir = "/slot/sandbox/d/in/data/0_data_unpacked/jester_full_data"
        self.batch_size = batch_size
        
    def __call__(self):
        subdirs = list(self.dyn_mapping.keys())
        if self.mode == 'train':
            random.shuffle(subdirs)
        for subdir in subdirs:
            subdir_path = os.path.join(self.root_dir, str(subdir))
            images = [file for file in os.listdir(subdir_path) if file.lower().endswith(('.png', '.jpg', '.jpeg'))]
            images.sort(key=lambda x: int(x.lstrip('0').split('.')[0]))
            vid_frames = []
            jester_g = self.JESTER_MAPPING[self.dyn_mapping[int(subdir)]]
            p = random.random()
            if p < 0.5:
                if jester_g == 'swipe_r':
                    jester_g = 'swipe_l'
                elif jester_g == 'swipe_l':
                     jester_g = 'swipe_r'
            for num, file in enumerate(images):
                cnt = file.lstrip('0').split('.')[0]
                image_path = os.path.join(subdir_path, file)
                frame = cv2.imread(image_path)
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                orig_height_full, orig_width_full = frame.shape[:2]
                if p < 0.5:
                    M = np.float32([[-1, 0, orig_width_full],
                                    [ 0, 1, 0   ],
                                    [ 0, 0, 1   ]])
                    frame = cv2.warpPerspective(frame, M,(int(orig_width_full), int(orig_height_full)))
                if self.mode == 'train':
                    frame = self.augmentations(frame, initialize=(num == 0))
                IMAGE_SIZE =  132

                padded_img = np.zeros((max(frame.shape[:2]), max(frame.shape[:2]), 3))
                padded_height, padded_width = padded_img.shape[:2]
                padding_w = abs(padded_width - orig_width_full) // 2
                padding_h = abs(padded_height - orig_height_full) // 2
                padded_img[padding_h:padding_h + orig_height_full, padding_w:padding_w + orig_width_full] = frame

                resized_image = cv2.resize(padded_img, (IMAGE_SIZE, IMAGE_SIZE), interpolation = cv2.INTER_AREA)
                img = resized_image / 255.0
                vid_frames.append(img)
            result = np.array(vid_frames)
            label = self.GESTURES_NAMES_INV[jester_g]

            yield result,label

checkpoint_path = "trained_model/.weights.h5"
checkpoint_dir = os.path.dirname(checkpoint_path)

cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True,
                                                 verbose=1)


def symmetrically_shift_padding(features, labels):
    batch_size = tf.shape(features)[0]
    max_length = tf.shape(features)[1]
    def shift_padding_single_feature(feature):
        current_length = tf.reduce_sum(tf.cast(tf.reduce_any(feature != 0, axis=[1, 2, 3]), tf.int32))
        pad_total = max_length - current_length
        pad_before = pad_total // 2
        pad_after = pad_total - pad_before
        padded_feature = tf.pad(feature[:current_length], [[pad_before, pad_after], [0, 0], [0, 0], [0, 0]])
        return padded_feature

    shifted_features = tf.map_fn(shift_padding_single_feature, features, fn_output_signature=tf.TensorSpec(shape=(None, 132, 132, 3), dtype=tf.float32))

    return shifted_features, labels





def __main__():
    batch_size = 16

    output_signature = (tf.TensorSpec(shape = (None, IMAGE_SIZE, IMAGE_SIZE, 3), dtype = tf.float32),
                        tf.TensorSpec(shape = (), dtype = tf.int16))

    train_ds = tf.data.Dataset.from_generator(FrameGenerator(mode = 'train'),
                                              output_signature = output_signature)
    train_ds = train_ds.padded_batch(
        batch_size=batch_size,  # Set your desired batch size
        drop_remainder=True,
        padded_shapes=([40, 132, 132, 3], [])  # Pad only the first dimension (n_frames)
    )
    train_ds = train_ds.map(symmetrically_shift_padding)
    train_ds = train_ds.map(lambda x, y: (tf.ensure_shape(x, (batch_size, 50, IMAGE_SIZE, IMAGE_SIZE, 3)), y))

    test_ds = tf.data.Dataset.from_generator(FrameGenerator(mode='val'),
                                             output_signature = output_signature)
    test_ds = test_ds.batch(1)
    test_ds = test_ds.map(lambda x, y: (tf.ensure_shape(x, (1, 40, IMAGE_SIZE, IMAGE_SIZE, 3)), y))

    model_id = 'a0'
    resolution = IMAGE_SIZE

    tf.keras.backend.clear_session()

    backbone = movinet.Movinet(model_id=model_id,
        causal=True,
        conv_type='2plus1d',
        se_type='2plus3d',
        activation='hard_swish',
        gating_activation='hard_sigmoid',
        use_positional_encoding=False,
         use_external_states=True, 
    )
    model = StreamingTrainMovinetClassifier(backbone=backbone, num_classes=8,output_states=True)

    
    num_epochs = 2

    loss_obj = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    optimizer = tf.keras.optimizers.Adam(learning_rate = 0.001)

    model.compile(loss=loss_obj, optimizer=optimizer, metrics=[
        'accuracy', 
    ])
    
    results = model.fit(train_ds,
                    validation_data=test_ds,
                    epochs=num_epochs,
                    class_weight={i:weight for i, weight in enumerate([15.5279, 16.2602, 16.6928,  1.0349, 13.7888,  2.1299,  7.2436,  7.0630])},
                    validation_freq=1,
                    verbose=1,
                    callbacks=[cp_callback])

if __name__ == "__main__":
    __main__()
Leave a Comment