Untitled
unknown
plain_text
8 months ago
11 kB
16
Indexable
import os
from typing import Dict
import cv2
import numpy as np
import torch
import torchvision
from models._base.base_extractor import BaseExtractor
from models.i3d.i3d_src.i3d_net import I3D
from models.transforms import (Clamp, PermuteAndUnsqueeze, PILToTensor,
ResizeImproved, ScaleTo1_1, TensorCenterCrop,
ToFloat, ToUInt8)
from utils.io import reencode_video_with_diff_fps
from utils.utils import dp_state_to_normal, show_predictions_on_dataset
class ExtractI3D(BaseExtractor):
def __init__(self, args) -> None:
# init the BaseExtractor
super().__init__(
feature_type=args.feature_type,
on_extraction=args.on_extraction,
tmp_path=args.tmp_path,
output_path=args.output_path,
keep_tmp_files=args.keep_tmp_files,
device=args.device,
)
# (Re-)Define arguments for this class
self.streams = ['rgb', 'flow'] if args.streams is None else [args.streams]
self.flow_type = args.flow_type
self.i3d_classes_num = 400
self.min_side_size = 256
self.central_crop_size = 224
self.extraction_fps = args.extraction_fps
self.step_size = 64 if args.step_size is None else args.step_size
self.stack_size = 64 if args.stack_size is None else args.stack_size
self.resize_transforms = torchvision.transforms.Compose([
torchvision.transforms.ToPILImage(),
ResizeImproved(self.min_side_size),
PILToTensor(),
ToFloat(),
])
self.i3d_transforms = {
'rgb': torchvision.transforms.Compose([
TensorCenterCrop(self.central_crop_size),
ScaleTo1_1(),
PermuteAndUnsqueeze()
]),
'flow': torchvision.transforms.Compose([
TensorCenterCrop(self.central_crop_size),
Clamp(-20, 20),
ToUInt8(),
ScaleTo1_1(),
PermuteAndUnsqueeze()
])
}
# TV-L1 optical flow parameters
if self.flow_type == 'tvl1':
self.tvl1 = cv2.optflow.DualTVL1OpticalFlow_create(
tau=0.25,
lambda_=0.15,
theta=0.3,
nscales=5,
warps=5,
epsilon=0.01,
innnerIterations=30,
outerIterations=10,
scaleStep=0.8,
gamma=0.0,
medianFiltering=5
)
self.show_pred = args.show_pred
self.output_feat_keys = self.streams + ['fps', 'timestamps_ms']
self.name2module = self.load_model()
@torch.no_grad()
def extract(self, video_path: str) -> Dict[str, np.ndarray]:
"""The extraction call. Made to clean the forward call a bit.
Arguments:
video_path (str): a video path from which to extract features
Returns:
Dict[str, np.ndarray]: feature name (e.g. 'fps' or feature_type) to the feature tensor
"""
# take the video, change fps and save to the tmp folder
if self.extraction_fps is not None:
video_path = reencode_video_with_diff_fps(video_path, self.tmp_path, self.extraction_fps)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
# timestamp when the last frame in the stack begins (when the old frame of the last pair ends)
timestamps_ms = []
rgb_stack = []
feats_dict = {stream: [] for stream in self.streams}
# sometimes when the target fps is 1 or 2, the first frame of the reencoded video is missing
# and cap.read returns None but the rest of the frames are ok. timestep is 0.0 for the 2nd frame in
# this case
first_frame = True
stack_counter = 0
while cap.isOpened():
frame_exists, rgb = cap.read()
if first_frame:
first_frame = False
if frame_exists is False:
continue
if frame_exists:
# preprocess the image
rgb = cv2.cvtColor(rgb, cv2.COLOR_BGR2RGB)
rgb = self.resize_transforms(rgb)
rgb = rgb.unsqueeze(0)
rgb_stack.append(rgb)
# - 1 is used because we need B+1 frames to calculate B frames
if len(rgb_stack) - 1 == self.stack_size:
batch_feats_dict = self.run_on_a_stack(rgb_stack, stack_counter)
for stream in self.streams:
feats_dict[stream].extend(batch_feats_dict[stream].tolist())
# leaving the elements if step_size < stack_size so they will not be loaded again
# if step_size == stack_size one element is left because the flow between the last element
# in the prev list and the first element in the current list
rgb_stack = rgb_stack[self.step_size:]
stack_counter += 1
timestamps_ms.append(cap.get(cv2.CAP_PROP_POS_MSEC))
else:
# we don't run inference if the stack is not full (applicable for i3d)
cap.release()
break
# removes the video with different fps if it was created to preserve disk space
if (self.extraction_fps is not None) and (not self.keep_tmp_files):
os.remove(video_path)
# transforms list of features into a np array
feats_dict = {stream: np.array(feats) for stream, feats in feats_dict.items()}
# also include the timestamps and fps
feats_dict['fps'] = np.array(fps)
feats_dict['timestamps_ms'] = np.array(timestamps_ms)
return feats_dict
def calculate_tvl1_flow(self, frame1: np.ndarray, frame2: np.ndarray) -> np.ndarray:
"""Calculate optical flow using TV-L1 algorithm.
Args:
frame1 (np.ndarray): First frame
frame2 (np.ndarray): Second frame
Returns:
np.ndarray: Optical flow in the format expected by I3D
"""
# Convert to grayscale
gray1 = cv2.cvtColor(frame1, cv2.COLOR_RGB2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_RGB2GRAY)
# Calculate flow
flow = self.tvl1.calc(gray1, gray2, None)
# Stack flow components for I3D (expects [u, v] format)
return flow
def run_on_a_stack(self, rgb_stack, stack_counter) -> Dict[str, torch.Tensor]:
models = self.name2module['model']
rgb_stack_cpu = torch.cat(rgb_stack).cpu()
batch_feats_dict = {}
for stream in self.streams:
# if i3d stream is flow, we first need to calculate optical flow, otherwise, we use rgb
# `end_idx-1` and `start_idx+1` because flow is calculated between f and f+1 frames
# we also use `end_idx-1` for stream == 'rgb' case: just to make sure the feature length
# is same regardless of whether only rgb is used or flow
if stream == 'flow':
if self.flow_type == 'tvl1':
# Convert tensors to numpy arrays and prepare flow stack
flow_stack = []
for i in range(len(rgb_stack_cpu) - 1):
# Convert from tensor [C,H,W] to numpy [H,W,C]
frame1 = rgb_stack_cpu[i].numpy().transpose(1, 2, 0)
frame2 = rgb_stack_cpu[i + 1].numpy().transpose(1, 2, 0)
# Scale to 0-255 range for TV-L1
frame1 = (frame1 * 255).astype(np.uint8)
frame2 = (frame2 * 255).astype(np.uint8)
# Calculate flow
flow = self.calculate_tvl1_flow(frame1, frame2)
# Convert back to tensor format [C,H,W]
flow_tensor = torch.from_numpy(flow.transpose(2, 0, 1)).float()
flow_stack.append(flow_tensor)
# Stack all flows
stream_slice = torch.stack(flow_stack).to(self.device)
else:
raise NotImplementedError(f"Flow type {self.flow_type} not implemented")
elif stream == 'rgb':
stream_slice = rgb_stack_cpu[:-1].to(self.device)
else:
raise NotImplementedError(f"Stream {stream} not implemented")
# apply transforms depending on the stream (flow or rgb)
stream_slice = self.i3d_transforms[stream](stream_slice)
# extract features for a stream
batch_feats_dict[stream] = models[stream](stream_slice, features=True) # (B, 1024)
# add features to the output dict
self.maybe_show_pred(stream_slice, self.name2module['model'][stream], stack_counter)
return batch_feats_dict
def load_model(self) -> Dict[str, torch.nn.Module]:
"""Defines the models, loads checkpoints, sends them to the device.
Since I3D is two-stream, it may load a optical flow extraction model as well.
Returns:
Dict[str, torch.nn.Module]: model-agnostic dict holding modules for extraction and show_pred
"""
i3d_weights_paths = {
'rgb': './models/i3d/checkpoints/i3d_rgb.pt',
'flow': './models/i3d/checkpoints/i3d_flow.pt',
}
name2module = {}
# Feature extraction models (rgb and flow streams)
i3d_stream_models = {}
for stream in self.streams:
i3d_stream_model = I3D(num_classes=self.i3d_classes_num, modality=stream)
i3d_stream_model.load_state_dict(torch.load(i3d_weights_paths[stream], map_location='cpu'))
i3d_stream_model = i3d_stream_model.to(self.device)
i3d_stream_model.eval()
i3d_stream_models[stream] = i3d_stream_model
name2module['model'] = i3d_stream_models
return name2module
def maybe_show_pred(self, stream_slice: torch.Tensor, model: torch.nn.Module, stack_counter: int) -> None:
if self.show_pred:
softmaxes, logits = model(stream_slice, features=False)
print(f'At stack {stack_counter} ({model.modality} stream)')
show_predictions_on_dataset(logits, 'kinetics')Editor is loading...
Leave a Comment