import tqdm import random import pathlib import itertools import collections import os import pandas as pd import cv2 import numpy as np import matplotlib.pyplot as plt import keras import tensorflow as tf import tensorflow_hub as hub from tensorflow.keras import layers from tensorflow.keras.optimizers import Adam from tensorflow.keras.losses import SparseCategoricalCrossentropy from official.projects.movinet.modeling import movinet from official.projects.movinet.modeling import movinet_model class Augmentations: def __init__(self, prob_rotation=0.5, prob_zoom_out=0.5, prob_contrast=0.5, prob_brightness=0.5, prob_perspective=0.5): self.max_rotation = 8 self.max_zoom_out = 0.7 self.contrast_range = (0.8, 1.6) self.brightness_range = (0.8, 1.4) self.perspective_transform_range = 0.03 self.prob_rotation = prob_rotation self.prob_zoom_out = prob_zoom_out self.prob_contrast = prob_contrast self.prob_brightness = prob_brightness self.prob_perspective = prob_perspective self.params = None def initialize_params(self, image): params = {} if np.random.rand() < self.prob_rotation: params['rotation'] = np.random.uniform(-self.max_rotation, self.max_rotation) else: params['rotation'] = None if np.random.rand() < self.prob_zoom_out: params['zoom_factor'] = 1 - np.random.uniform(0, self.max_zoom_out) else: params['zoom_factor'] = None if np.random.rand() < self.prob_contrast: params['contrast_alpha'] = np.random.uniform(*self.contrast_range) else: params['contrast_alpha'] = None if np.random.rand() < self.prob_brightness: params['brightness_beta'] = np.random.uniform(*self.brightness_range)# * 255 else: params['brightness_beta'] = None if np.random.rand() < self.prob_perspective: h, w = image.shape[:2] src_points = np.float32([ [0, 0], [w, 0], [0, h], [w, h] ]) dst_points = src_points + np.random.uniform(-self.perspective_transform_range*w, self.perspective_transform_range*w, src_points.shape).astype(np.float32) params['perspective_matrix'] = cv2.getPerspectiveTransform(src_points, dst_points) else: params['perspective_matrix'] = None self.params = params def apply_augmentations(self, image): if self.params['rotation'] is not None: image = self.apply_rotation(image, self.params['rotation']) if self.params['zoom_factor'] is not None: image = self.apply_zoom_out(image, self.params['zoom_factor']) if self.params['contrast_alpha'] is not None: image = self.apply_contrast(image, self.params['contrast_alpha']) if self.params['brightness_beta'] is not None: image = self.apply_brightness(image, self.params['brightness_beta']) if self.params['perspective_matrix'] is not None: image = self.apply_perspective_transform(image, self.params['perspective_matrix']) return image def apply_rotation(self, image, angle): h, w = image.shape[:2] matrix = cv2.getRotationMatrix2D((w/2, h/2), angle, 1) return cv2.warpAffine(image, matrix, (w, h)) def apply_zoom_out(self, image, zoom_factor): h, w = image.shape[:2] nh, nw = int(h * zoom_factor), int(w * zoom_factor) resized_image = cv2.resize(image, (nw, nh)) new_image = np.zeros_like(image) new_image[(h-nh)//2:(h-nh)//2+nh, (w-nw)//2:(w-nw)//2+nw] = resized_image return new_image def apply_contrast(self, image, alpha): return cv2.convertScaleAbs(image, alpha=alpha, beta=0) def apply_brightness(self, image, beta): return cv2.convertScaleAbs(image, alpha=1, beta=beta) def apply_perspective_transform(self, image, matrix): h, w = image.shape[:2] return cv2.warpPerspective(image, matrix, (w, h)) def __call__(self, image, initialize=False): if initialize or self.params is None: self.initialize_params(image) return self.apply_augmentations(image) class StreamingTrainMovinetClassifier(movinet_model.MovinetClassifier): def __init__(self, **kwargs): super().__init__(**kwargs) self.gradient_accumulation = [tf.Variable(tf.zeros_like(v, dtype=tf.float32), trainable=False) for v in self.trainable_variables] def train_step(self, data): x, y, sample_weight = data frames = tf.split(x, x.shape[1], axis=1) # frames = [x[:, i:i+1,] for i in (tf.range(tf.shape(x)[1]))] # frames = tf.unstack(x, axis=1) # num_partitions = 50 # partitions = tf.range(tf.shape(x)[1]) # frames = tf.dynamic_partition(x, partitions, num_partitions) # init_states_fn = self.init_states # print(tf.expand_dims(frames[0], axis=1).shape) init_states = self.init_states(frames[0].shape) states = init_states # gradient_accumulation = [tf.Variable(tf.zeros_like(v, dtype=tf.float32), trainable=False) for v in self.trainable_variables] total_loss = 0.0 for frame in frames: with tf.GradientTape() as tape: logits, states = self({**states, 'image': frame},training=True) loss = self.compute_loss( y=y, y_pred=logits, sample_weight=sample_weight ) total_loss += loss / len(frames) gradients = tape.gradient(loss, self.trainable_variables) for i in range(len(self.gradient_accumulation)): self.gradient_accumulation[i].assign_add(gradients[i] / len(frames)) self.optimizer.apply_gradients(zip(self.gradient_accumulation, self.trainable_variables)) for i in range(len(self.gradient_accumulation)): self.gradient_accumulation[i].assign(tf.zeros_like(self.trainable_variables[i], dtype=tf.float32)) self.compiled_metrics.update_state(y, logits, sample_weight) for metric in self.metrics: if metric.name == "classifier_head_loss": continue if metric.name == "loss": metric.update_state(total_loss) else: metric.update_state(y, logits) # Return a dict mapping metric names to current value return {m.name: m.result() for m in self.metrics} def test_step(self, data): x, y, sample_weight = data frames = tf.split(x, x.shape[1], axis=1) init_states = self.init_states(frames[0].shape) states = init_states # gradient_accumulation = [tf.Variable(tf.zeros_like(v, dtype=tf.float32), trainable=False) for v in self.trainable_variables] total_loss = 0.0 for frame in frames: logits, states = self({**states, 'image': frame},training=True) loss = self.compute_loss( y=y, y_pred=logits, sample_weight=sample_weight ) self.compiled_metrics.update_state(y, logits, sample_weight) # print('test metrics: ', self.compiled_metrics.result()) for metric in self.metrics: if metric.name == "classifier_head_loss": continue if metric.name == "loss": metric.update_state(total_loss) else: metric.update_state(y, logits) # Return a dict mapping metric names to current value return {m.name: m.result() for m in self.metrics} IMAGE_SIZE = 132 class FrameGenerator: JESTER_MAPPING = { 'Swiping Right': 'swipe_r', 'Swiping Left': 'swipe_l', "Swiping Down": 'swipe_d', "Swiping Up": 'swipe_u', "Pushing Hand Away": 'other', "Pulling Hand In": 'other', "Sliding Two Fingers Left": 'shake', "Sliding Two Fingers Right": 'shake', "Sliding Two Fingers Down": 'swipe_d', "Sliding Two Fingers Up": 'swipe_u', "Pushing Two Fingers Away": 'other', "Pulling Two Fingers In": 'other', "Rolling Hand Forward": 'shake', "Rolling Hand Backward": 'shake', "Turning Hand Clockwise": 'shake', "Turning Hand Counterclockwise": 'shake', "Zooming In With Full Hand": 'other', "Zooming Out With Full Hand": 'other', "Zooming In With Two Fingers": 'other', "Zooming Out With Two Fingers": 'other', "Thumb Up": 'other', "Thumb Down": 'other', "Shaking Hand": 'shake', "Stop Sign": 'stop', "Drumming Fingers": 'other', "No gesture": 'no_gesture', "Doing other things": 'other' } GESTURES_NAMES = ['stop', 'swipe_l', 'swipe_r', 'other', 'no_gesture', 'shake', 'swipe_u', 'swipe_d'] GESTURES_NAMES_INV = { 'stop': 0, 'swipe_l': 1, 'swipe_r': 2, 'other': 3, 'no_gesture': 4, 'shake': 5, 'swipe_u': 6, 'swipe_d': 7 } def __init__(self, mode, batch_size=32): """ Returns a set of frames with their associated label. Args: path: Video file paths. n_frames: Number of frames. training: Boolean to determine if training dataset is being created. """ self.mode = mode self.augmentations = Augmentations( prob_rotation=0.35, prob_zoom_out=0.35, prob_contrast=0.35, prob_brightness=0.35, prob_perspective=0.35 ) if mode == 'train': self.dyn_mapping = dict(zip(pd.read_csv('/slot/sandbox/d/in/data/0_data_unpacked/jester-v1-train.csv', header=None, sep=';')[0], pd.read_csv('/slot/sandbox/d/in/data/0_data_unpacked/jester-v1-train.csv', header=None, sep=';')[1])) else: self.dyn_mapping = dict(zip(pd.read_csv('/slot/sandbox/d/in/data/0_data_unpacked/jester-v1-validation.csv', header=None, sep=';')[0], pd.read_csv('/slot/sandbox/d/in/data/0_data_unpacked/jester-v1-validation.csv', header=None, sep=';')[1])) self.root_dir = "/slot/sandbox/d/in/data/0_data_unpacked/jester_full_data" self.batch_size = batch_size def __call__(self): subdirs = list(self.dyn_mapping.keys()) if self.mode == 'train': random.shuffle(subdirs) for subdir in subdirs: subdir_path = os.path.join(self.root_dir, str(subdir)) images = [file for file in os.listdir(subdir_path) if file.lower().endswith(('.png', '.jpg', '.jpeg'))] images.sort(key=lambda x: int(x.lstrip('0').split('.')[0])) vid_frames = [] jester_g = self.JESTER_MAPPING[self.dyn_mapping[int(subdir)]] p = random.random() if p < 0.5: if jester_g == 'swipe_r': jester_g = 'swipe_l' elif jester_g == 'swipe_l': jester_g = 'swipe_r' for num, file in enumerate(images): cnt = file.lstrip('0').split('.')[0] image_path = os.path.join(subdir_path, file) frame = cv2.imread(image_path) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) orig_height_full, orig_width_full = frame.shape[:2] if p < 0.5: M = np.float32([[-1, 0, orig_width_full], [ 0, 1, 0 ], [ 0, 0, 1 ]]) frame = cv2.warpPerspective(frame, M,(int(orig_width_full), int(orig_height_full))) if self.mode == 'train': frame = self.augmentations(frame, initialize=(num == 0)) IMAGE_SIZE = 132 padded_img = np.zeros((max(frame.shape[:2]), max(frame.shape[:2]), 3)) padded_height, padded_width = padded_img.shape[:2] padding_w = abs(padded_width - orig_width_full) // 2 padding_h = abs(padded_height - orig_height_full) // 2 padded_img[padding_h:padding_h + orig_height_full, padding_w:padding_w + orig_width_full] = frame resized_image = cv2.resize(padded_img, (IMAGE_SIZE, IMAGE_SIZE), interpolation = cv2.INTER_AREA) img = resized_image / 255.0 vid_frames.append(img) result = np.array(vid_frames) label = self.GESTURES_NAMES_INV[jester_g] yield result,label checkpoint_path = "trained_model/.weights.h5" checkpoint_dir = os.path.dirname(checkpoint_path) cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, save_weights_only=True, verbose=1) def symmetrically_shift_padding(features, labels): batch_size = tf.shape(features)[0] max_length = tf.shape(features)[1] def shift_padding_single_feature(feature): current_length = tf.reduce_sum(tf.cast(tf.reduce_any(feature != 0, axis=[1, 2, 3]), tf.int32)) pad_total = max_length - current_length pad_before = pad_total // 2 pad_after = pad_total - pad_before padded_feature = tf.pad(feature[:current_length], [[pad_before, pad_after], [0, 0], [0, 0], [0, 0]]) return padded_feature shifted_features = tf.map_fn(shift_padding_single_feature, features, fn_output_signature=tf.TensorSpec(shape=(None, 132, 132, 3), dtype=tf.float32)) return shifted_features, labels def __main__(): batch_size = 16 output_signature = (tf.TensorSpec(shape = (None, IMAGE_SIZE, IMAGE_SIZE, 3), dtype = tf.float32), tf.TensorSpec(shape = (), dtype = tf.int16)) train_ds = tf.data.Dataset.from_generator(FrameGenerator(mode = 'train'), output_signature = output_signature) train_ds = train_ds.padded_batch( batch_size=batch_size, # Set your desired batch size drop_remainder=True, padded_shapes=([40, 132, 132, 3], []) # Pad only the first dimension (n_frames) ) train_ds = train_ds.map(symmetrically_shift_padding) train_ds = train_ds.map(lambda x, y: (tf.ensure_shape(x, (batch_size, 50, IMAGE_SIZE, IMAGE_SIZE, 3)), y)) test_ds = tf.data.Dataset.from_generator(FrameGenerator(mode='val'), output_signature = output_signature) test_ds = test_ds.batch(1) test_ds = test_ds.map(lambda x, y: (tf.ensure_shape(x, (1, 40, IMAGE_SIZE, IMAGE_SIZE, 3)), y)) model_id = 'a0' resolution = IMAGE_SIZE tf.keras.backend.clear_session() backbone = movinet.Movinet(model_id=model_id, causal=True, conv_type='2plus1d', se_type='2plus3d', activation='hard_swish', gating_activation='hard_sigmoid', use_positional_encoding=False, use_external_states=True, ) model = StreamingTrainMovinetClassifier(backbone=backbone, num_classes=8,output_states=True) num_epochs = 2 loss_obj = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam(learning_rate = 0.001) model.compile(loss=loss_obj, optimizer=optimizer, metrics=[ 'accuracy', ]) results = model.fit(train_ds, validation_data=test_ds, epochs=num_epochs, class_weight={i:weight for i, weight in enumerate([15.5279, 16.2602, 16.6928, 1.0349, 13.7888, 2.1299, 7.2436, 7.0630])}, validation_freq=1, verbose=1, callbacks=[cp_callback]) if __name__ == "__main__": __main__()
