Untitled
unknown
plain_text
3 months ago
20 kB
4
Indexable
import os import sys import json from pathlib import Path import numpy as np from tqdm import tqdm import pandas as pd import glob import random from sklearn.metrics import roc_auc_score, roc_curve, auc # plotting import matplotlib.pyplot as plt import plotly.graph_objects as go from plotly.subplots import make_subplots # DeepLearning import torch import torch.nn as nn import torch.nn.functional as F from torch.autograd import Variable import torch.optim as optim print("Imports OK") print(np.__version__) print(torch.__version__) # DATASET SPLITS DATA_FOLDER = 'Shapenetcore_benchmark/' class_name_id_map = {'Airplane': 0, 'Bag': 1, 'Cap': 2, 'Car': 3, 'Chair': 4, 'Earphone': 5, 'Guitar': 6, 'Knife': 7, 'Lamp': 8, 'Laptop': 9, 'Motorbike': 10, 'Mug': 11, 'Pistol': 12, 'Rocket': 13, 'Skateboard': 14, 'Table': 15} #class_name_id_map = { 'Car': 3,'Motorbike': 10,} class_id_name_map = {v:k for k,v in class_name_id_map.items()} PCD_SCENE=dict(xaxis=dict(visible=False), yaxis=dict(visible=False), zaxis=dict(visible=False), aspectmode='data') device = torch.device("cuda" if torch.cuda.is_available() else "cpu") train_split_data = json.load(open('Shapenetcore_benchmark/train_split.json', 'r')) train_class_count = np.array([x[0] for x in train_split_data]) # plot classwise count in train set train_dist_plots = [go.Bar(x=list(class_name_id_map.keys()), y= np.bincount(train_class_count))] layout = dict(title="Train dataset", title_x=0.5) #fig = go.Figure(data=train_dist_plots, layout=layout) #fig.show() points_list = glob.glob("Shapenetcore_benchmark/04379243/points/*.npy") print(len(points_list)) # LOAD A POINT CLOUD # Random point cloud idx = random.randint(0,len(points_list)) # load random point cloud points = np.load(points_list[idx]) print(f"points shape = {points.shape}, min xyz = {np.min(points, axis=0)}, max xyz = {np.max(points, axis=0)}") # load segmentation lables seg_file_path = points_list[idx].replace('points', 'points_label').replace('.npy', '.seg') seg_labels = np.loadtxt(seg_file_path).astype(np.int8) print(f"seg_labels shape = {seg_labels.shape}, unique labels = {np.unique(seg_labels)}") import plotly.graph_objects as go # There are max of 16 parts in an object in Shapenet core dataset # Creating random colors according to part label NUM_PARTS = 16 PART_COLORS = np.random.choice(range(255), size=(NUM_PARTS, 3)) # Create a color array for eacoint cloud datah point based on its segment label colors = PART_COLORS[seg_labels - 1] / 255.0 # Normalize to [0, 1] range # Create a 3D scatter plot using plotly fig = go.Figure(data=[go.Scatter3d( x=points[:, 0], y=points[:, 1], z=points[:, 2], mode='markers', marker=dict( size=2, color=colors, # Set color to the colors array opacity=0.8 ) )]) # Set layout for the plot fig.update_layout( template="plotly_dark", title="Raw Point Cloud", scene=dict( xaxis_title='X', yaxis_title='Y', zaxis_title='Z' ), title_x=0.5 ) # Show the plot #fig.show() class ShapeNetDataset(torch.utils.data.Dataset): def __init__(self, root_dir, split_type, num_samples=2500): self.root_dir = root_dir self.split_type = split_type self.num_samples = num_samples with open(os.path.join(root_dir, f'{self.split_type}_split.json'), 'r') as f: self.split_data = json.load(f) def __getitem__(self, index): # read point cloud data class_id, class_name, point_cloud_path, seg_label_path = self.split_data[index] # point cloud data point_cloud_path = os.path.join(self.root_dir, point_cloud_path) pc_data = np.load(point_cloud_path) # segmentation labels # -1 is to change part values from [1-16] to [0-15] # which helps when running segmentation pc_seg_labels = np.loadtxt(os.path.join(self.root_dir, seg_label_path)).astype(np.int8) - 1 #pc_seg_labels = pc_seg_labels.reshape(pc_seg_labels.size,1) # Sample fixed number of points num_points = pc_data.shape[0] if num_points < self.num_samples: # Duplicate random points if the number of points is less than max_num_points additional_indices = np.random.choice(num_points, self.num_samples - num_points, replace=True) pc_data = np.concatenate((pc_data, pc_data[additional_indices]), axis=0) pc_seg_labels = np.concatenate((pc_seg_labels, pc_seg_labels[additional_indices]), axis=0) else: # Randomly sample max_num_points from the available points random_indices = np.random.choice(num_points, self.num_samples) pc_data = pc_data[random_indices] pc_seg_labels = pc_seg_labels[random_indices] # return variable data_dict= {} data_dict['class_id'] = class_id data_dict['class_name'] = class_name data_dict['points'] = pc_data data_dict['seg_labels'] = pc_seg_labels return data_dict def __len__(self): return len(self.split_data) train_set = ShapeNetDataset(root_dir = DATA_FOLDER, split_type='train') val_set = ShapeNetDataset(root_dir = DATA_FOLDER, split_type='val') test_set = ShapeNetDataset(root_dir = DATA_FOLDER, split_type='test') def showDataLayout(): plt.bar(["Train","Valid","Test"], [len(train_set), len(val_set), len(test_set)], align='center',color=[ 'black','red', 'blue']) plt.legend() plt.ylabel('Number of images') plt.title('Data distribution') plt.savefig('data2018.png') plt.show() #print(len(train_set)) #print(len(val_set)) #print(len(test_set)) #showDataLayout() data_dict= train_set[18] print(f"class in dataset sample = {list(data_dict.keys())}") points = data_dict['points'] seg_labels = data_dict['seg_labels'] print(f"class_id = {data_dict['class_id']}, class_name = {data_dict['class_name']}") # Create a 3D scatter plot using plotly pc_plots = go.Scatter3d( x=points[:, 0], y=points[:, 1], z=points[:, 2], mode='markers', marker=dict( size=2, color=colors, # Set color to the colors array opacity=0.8 ) ) # Set layout for the plot layout = dict( template="plotly_dark", title=f"{data_dict['class_name']}, class id = {data_dict['class_id']}, Shapenet Dataset", scene=PCD_SCENE, title_x=0.5 ) # Create the figure and show the plot fig = go.Figure(data=[pc_plots], layout=layout) #fig.show() # processing a list of batch items, each containing information about a 3D point cloud def collate_fn(batch_list): ret = {} ret['class_id'] = torch.from_numpy(np.array([x['class_id'] for x in batch_list])).long() ret['class_name'] = np.array([x['class_name'] for x in batch_list]) ret['points'] = torch.from_numpy(np.stack([x['points'] for x in batch_list], axis=0)).float() ret['seg_labels'] = torch.from_numpy(np.stack([x['seg_labels'] for x in batch_list], axis=0)).long() return ret sample_loader = torch.utils.data.DataLoader(train_set, batch_size=16, num_workers=2, shuffle=True, collate_fn=collate_fn) dataloader_iter = iter(sample_loader) batch_dict = next(dataloader_iter) print(batch_dict.keys()) for key in ['points','seg_labels', 'class_id']: print(f"batch_dict[{key}].shape = {batch_dict[key].shape}") batchSize= 64 workers = 2 train_loader = torch.utils.data.DataLoader(train_set, batch_size=batchSize, shuffle=True, num_workers=workers, collate_fn=collate_fn) val_loader = torch.utils.data.DataLoader(val_set, batch_size=batchSize, shuffle=True, num_workers=workers, collate_fn=collate_fn) test_loader = torch.utils.data.DataLoader(test_set, batch_size=batchSize,shuffle=True, num_workers=workers, collate_fn=collate_fn) # T-Net class STN3d(nn.Module): def __init__(self, num_points = 2500): super(STN3d, self).__init__() self.num_points = num_points self.conv1 = torch.nn.Conv1d(3, 64, 1) self.conv2 = torch.nn.Conv1d(64, 128, 1) self.conv3 = torch.nn.Conv1d(128, 1024, 1) self.mp1 = torch.nn.MaxPool1d(num_points) # FC layers self.fc1 = nn.Linear(1024, 512) self.fc2 = nn.Linear(512, 256) self.fc3 = nn.Linear(256, 9) self.relu = nn.ReLU() self.bn1 = nn.BatchNorm1d(64) self.bn2 = nn.BatchNorm1d(128) self.bn3 = nn.BatchNorm1d(1024) self.bn4 = nn.BatchNorm1d(512) self.bn5 = nn.BatchNorm1d(256) def forward(self, x): batchsize = x.size()[0] # Expected input shape = (bs, 3, num_points) x = F.relu(self.bn1(self.conv1(x))) x = F.relu(self.bn2(self.conv2(x))) x = F.relu(self.bn3(self.conv3(x))) x = self.mp1(x) x = x.view(-1, 1024) x = F.relu(self.bn4(self.fc1(x))) x = F.relu(self.bn5(self.fc2(x))) x = self.fc3(x) iden = Variable(torch.from_numpy(np.array([1,0,0,0,1,0,0,0,1]).astype(np.float32))).view(1,9).repeat(batchsize,1) if x.is_cuda: iden = iden.cuda() x = x + iden x = x.view(-1, 3, 3) return x test_model = STN3d().to(device) sim_data = Variable(torch.rand(32,3,2500)).to(device) out = test_model(sim_data) print('stn', out.size()) # OpenShape class OpenShape(nn.Module): def __init__(self, num_points = 2500): super(OpenShape, self).__init__() self.conv1 = torch.nn.Conv1d(3, 64, 1) self.conv2 = torch.nn.Conv1d(64, 128, 1) self.conv3 = torch.nn.Conv1d(128, 1024, 1) self.mp1 = torch.nn.MaxPool1d(num_points) def forward(self, x): x = F.elu(self.conv1(x)) x = F.elu(self.conv2(x)) x = F.elu(self.conv3(x)) x = self.mp1(x) return x test_model = OpenShape().to(device) sim_data = Variable(torch.rand(32,3,2500)).to(device) out = test_model(sim_data) print('stn', out.size()) #FeatureNet class PointNetfeat(nn.Module): def __init__(self, num_points = 2500, global_feat = True): super(PointNetfeat, self).__init__() self.stn = STN3d(num_points = num_points) self.conv1 = torch.nn.Conv1d(3, 64, 1) self.conv2 = torch.nn.Conv1d(64, 128, 1) self.conv3 = torch.nn.Conv1d(128, 1024, 1) self.bn1 = nn.BatchNorm1d(64) self.bn2 = nn.BatchNorm1d(128) self.bn3 = nn.BatchNorm1d(1024) self.mp1 = torch.nn.MaxPool1d(num_points) self.num_points = num_points self.global_feat = global_feat self.OpenShape = OpenShape(num_points = num_points) def forward(self, x): batchsize = x.size()[0] trans = self.stn(x) x = x.transpose(2,1) x = torch.bmm(x, trans) x = x.transpose(2,1) x = F.elu(self.bn1(self.conv1(x))) pointfeat = x x = F.relu(self.bn2(self.conv2(x))) x = self.bn3(self.conv3(x)) x = self.mp1(x) x = x.view(-1, 1024) if self.global_feat: return x, trans else: x = x.view(-1, 1024, 1).repeat(1, 1, self.num_points) return torch.cat([x, pointfeat], 1), trans pointfeat = PointNetfeat(global_feat=True).to(device) out, _ = pointfeat(sim_data) print('global feat', out.size()) pointfeat = PointNetfeat(global_feat=False).to(device) out, _ = pointfeat(sim_data) print('point feat', out.size()) #Train def dice_score(pred, target): smooth = 1e-8 intersection = torch.sum(pred * target) union = torch.sum(pred) + torch.sum(target) dice = (2. * intersection + smooth) / (union + smooth) return dice.item() def train_model(model, num_epochs, criterion, optimizer, dataloader_train, label_str='class_id', lr_scheduler=None, output_name='pointnet.pth'): # move model to device model.to(device) for epoch in range(num_epochs): print(f"Starting {epoch + 1} epoch ...") # Training model.train() train_loss = 0.0 correct_predictions = 0 total_samples = 0 dice_scores = 0.0 y_true = [] y_scores = [] for batch_dict in tqdm(dataloader_train, total=len(dataloader_train)): # Forward pass x = batch_dict['points'].transpose(1, 2).to(device) labels = batch_dict[label_str].to(device) pred, _,_ = model(x) loss = criterion(pred, labels) train_loss += loss.item() # Calculate accuracy _, predicted_labels = torch.max(pred, 1) correct_predictions += (predicted_labels == labels).sum().item() total_samples += labels.size(0) # Calculate Dice Score pred_binary = F.softmax(pred, dim=1)[:, 1] > 0.5 # Assuming binary classification dice_scores += dice_score(pred_binary, labels) # Backward pass loss.backward() optimizer.step() optimizer.zero_grad() # adjust learning rate if lr_scheduler is not None: lr_scheduler.step() # compute per epoch losses, accuracy, Dice Score, and AUC train_loss = train_loss / len(dataloader_train) accuracy = correct_predictions / total_samples avg_dice_score = dice_scores / len(dataloader_train) print(f'Epoch: {epoch + 1}, Train Loss: {train_loss:6.5f}, Dice Score: {avg_dice_score:.4f}') torch.save(model.state_dict(), output_name) class PointNetDenseSeg(nn.Module): def __init__(self, num_points=2500, num_classes=16, num_instances=10): super(PointNetDenseSeg, self).__init__() self.num_points = num_points self.num_classes = num_classes self.num_instances = num_instances # Feature extraction using PointNetfeat self.feat = PointNetfeat(num_points, global_feat=False) # Semantic segmentation layers self.conv1_sem = torch.nn.Conv1d(1088, 512, 1) self.conv2_sem = torch.nn.Conv1d(512, 256, 1) self.conv3_sem = torch.nn.Conv1d(256, 128, 1) self.conv4_sem = torch.nn.Conv1d(128, num_classes, 1) self.bn1_sem = nn.BatchNorm1d(512) self.bn2_sem = nn.BatchNorm1d(256) self.bn3_sem = nn.BatchNorm1d(128) # Instance segmentation layers self.conv1_inst = torch.nn.Conv1d(1088, 512, 1) self.conv2_inst = torch.nn.Conv1d(512, 256, 1) self.conv3_inst = torch.nn.Conv1d(256, 128, 1) self.conv4_inst = torch.nn.Conv1d(128, num_instances, 1) self.bn1_inst = nn.BatchNorm1d(512) self.bn2_inst = nn.BatchNorm1d(256) self.bn3_inst = nn.BatchNorm1d(128) def forward(self, x): batchsize = x.size()[0] x, trans = self.feat(x) # Semantic segmentation branch x_sem = F.relu(self.bn1_sem(self.conv1_sem(x))) x_sem = F.relu(self.bn2_sem(self.conv2_sem(x_sem))) x_sem = F.relu(self.bn3_sem(self.conv3_sem(x_sem))) x_sem = self.conv4_sem(x_sem) # Instance segmentation branch x_inst = F.relu(self.bn1_inst(self.conv1_inst(x))) x_inst = F.relu(self.bn2_inst(self.conv2_inst(x_inst))) x_inst = F.relu(self.bn3_inst(self.conv3_inst(x_inst))) x_inst = self.conv4_inst(x_inst) return x_sem, x_inst, trans sseg = PointNetDenseSeg().to(device) out, _,_ = sseg(sim_data) print('sseg', out.size()) epoch = 10 weight_decay = 1e-4 max_lr = 1e-3 num_points = 2500 num_classes = 16 criterion = nn.CrossEntropyLoss() # create model, optimizer, lr_scheduler and pass to training function num_classes = len(class_id_name_map.items()) classifier = PointNetDenseSeg(num_points = num_points) # DEFINE OPTIMIZERS optimizer = torch.optim.AdamW(classifier.parameters(), lr=max_lr, weight_decay=weight_decay) if torch.cuda.is_available(): classifier.cuda() #Train # history = train_model(classifier, epoch, criterion, optimizer, train_loader, # label_str='seg_labels', output_name='pointnet_se_seg.pth') # Model Test classifier.load_state_dict(torch.load('pointnet_se_seg.pth')) classifier.eval() total_loss = 0.0 correct_predictions = 0 total_samples = 0 with torch.no_grad(): for batch_dict in tqdm(test_loader, total=len(test_loader)): x = batch_dict['points'].transpose(1, 2).to(device) labels = batch_dict['seg_labels'].to(device) pred, _,_ = classifier(x) # calculate loss loss = criterion(pred, labels) total_loss += loss.item() # calculate accuracy _, predicted = torch.max(pred, 1) correct_predictions += (predicted == labels).sum().item() total_samples += labels.size(0) # Print ground truth and predicted labels for each sample in the batch for i in range(labels.size(0)): # iterate over each sample in the batch gt_label = labels[i].item() # ground truth label pred_label = predicted[i].item() # predicted label print(f"Ground truth label: {gt_label}, Predicted label: {pred_label}") evaluation_loss = total_loss / len(test_loader) accuracy = correct_predictions / total_samples print(evaluation_loss) print(f"Evaluation Loss: {evaluation_loss:.4f}") # # Now Classifie a point cloud # point_cloud_path = 'Shapenetcore_benchmark/02691156/points/1d269dbde96f067966cf1b4a8fc3914e.npy' # point_cloud = np.load(point_cloud_path) # # Ensure the point cloud is of size [num_points, 3] # if point_cloud.shape[0] < 2500: # # If fewer points, replicate the points to reach 2500 # additional_points = np.random.choice(point_cloud.shape[0], 2500 - point_cloud.shape[0], replace=True) # point_cloud = np.concatenate((point_cloud, point_cloud[additional_points]), axis=0) # else: # # If more points, randomly sample 2500 points # point_cloud = point_cloud[np.random.choice(point_cloud.shape[0], 2500, replace=False)] # # Reshape the point cloud to match the input shape [batch_size, 3, num_points] # point_cloud = point_cloud.T # Shape (3, num_points) # point_cloud = point_cloud[np.newaxis, :, :] # Add batch dimension [1, 3, num_points] # # Convert point cloud to torch tensor # point_cloud = torch.tensor(point_cloud, dtype=torch.float32).to(device) # # Perform inference # with torch.no_grad(): # pred_sem, pred_inst, _ = classifier(point_cloud) # # You can get the semantic segmentation results (for each point) by selecting the class with the highest score # pred_class = torch.argmax(pred_sem, dim=1) # Get class predictions for each point (semantic segmentation) # # Now, to classify the entire point cloud (global classification) # # You can use the class with the highest average probability across all points # class_predictions = torch.argmax(pred_sem, dim=2) # (1, num_classes) (global class) # _, predicted_class = torch.max(class_predictions, 1) # # Get the predicted class as a scalar (if needed) for the first item in the batch # predicted_class = predicted_class.item() # This gives a scalar for the first sample in the batch # # Convert the predicted class id to the actual class name # predicted_class_name = class_id_name_map[predicted_class] # print(f"Predicted class: {predicted_class_name} (ID: {predicted_class})")
Editor is loading...
Leave a Comment