# -*- coding: utf-8 -*-
"""Copy of distance.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1Yf-RjL6ODlTn4iTm0p5295P3nWTOI8VG
"""
# Commented out IPython magic to ensure Python compatibility.
# -*- coding: utf-8 -*-
"""Untitled1.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1ifXhJ_QC7smKiefudDr9opnuaHK6u24m
"""
from google.colab import drive
drive.mount('/content/drive')
drive_root = "/content/drive/MyDrive/Task3"
# %cd $drive_root
# Commented out IPython magic to ensure Python compatibility.
# %cd $drive_root
# This serves as a template which will guide you through the implementation of this task. It is advised
# to first read the whole template and get a sense of the overall structure of the code before trying to fill in any of the TODO gaps
# First, we import necessary libraries:
import numpy as np
from torchvision import transforms
from torch.utils.data import DataLoader, TensorDataset
import os
import torch
from torchvision import transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
plt.ioff()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
def generate_embeddings():
"""
Transform, resize and normalize the images and then use a pretrained model to extract
the embeddings.
"""
# We transform the images to the expected input format 380x380, and with the expected normalization for the pretrained model
#
train_transforms = transforms.Compose([
transforms.Resize(384),
transforms.CenterCrop(380),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)])
train_dataset = datasets.ImageFolder(root="dataset/", transform=train_transforms)
# Hint: adjust batch_size and num_workers to your PC configuration, so that you don't
# run out of memory
train_loader = DataLoader(dataset=train_dataset,
batch_size=64,
shuffle=False,
pin_memory=True, num_workers=2)
# we use pretrained model EfficientNet_B0
weights = models.EfficientNet_B4_Weights.DEFAULT
model = models.efficientnet_b4(weights=weights)
# freeze the base layers
for param in model.features.parameters():
param.requires_grad = False
# remove last layers
newmodel = torch.nn.Sequential(*(list(model.children())[:-1]))
y = torch.zeros(0, 1792).to(device) # empty tensor that will include all the embeddings
for batch, (X) in enumerate(train_loader):
print(X[0].shape)
y_c = newmodel(X[0])[:, :, 0, 0]
y = torch.cat((y, y_c.to(device)))
if (batch + 1) % 5 == 0:
print((batch + 1) * 64)
embedding_size = 1792 # number of the outputs of the pretrained model after removing last layers
# pick your model
num_images = len(train_dataset)
embeddings = y.cpu().numpy()
np.save('dataset/embeddings.npy', embeddings)
def get_data(file, train=True):
"""
Load the triplets from the file and generate the features and labels.
input: file: string, the path to the file containing the triplets
train: boolean, whether the data is for training or testing
output: X: numpy array, the features
y: numpy array, the labels
"""
triplets = []
with open(file) as f:
for line in f:
triplets.append(line)
# generate training data from triplets
train_dataset = datasets.ImageFolder(root="dataset/",
transform=None)
filenames = [s[0].split('/')[-1].replace('.jpg', '') for s in train_dataset.samples]
embeddings = np.load('dataset/embeddings.npy')
# We normalize the embeddings across the dataset
scaler = MinMaxScaler() # because the embeddings don't have a gaussian distribution, we use minmax
embeddings = scaler.fit_transform(embeddings)
file_to_embedding = {}
for i in range(len(filenames)):
file_to_embedding[filenames[i]] = embeddings[i]
if train:
triplets, val_triplets = train_test_split(triplets, test_size=0.2, random_state=1) # 20 percent for validation
triplets, test_triplets = train_test_split(triplets, test_size=0.125,
random_state=1) # 10 percent for final test
X = []
y = []
X_v = []
y_v = []
X_t = []
y_t = []
# use the individual embeddings to generate the features and labels for triplets
for t in triplets:
X.append(t)
X.append(t[::-1])
emb = [a for a in t.split()]
X.append(np.hstack([emb[0], emb[1], emb[2]]))
y.append(1)
# Generating negative samples (data augmentation)
if train:
X.append(np.hstack([emb[0], emb[2], emb[1]]))
y.append(0)
if train:
for t in val_triplets:
X_v.append(t)
X_v.append(t[::-1])
# emb = [a for a in t.split()]
# X_v.append(np.hstack([emb[0], emb[1], emb[2]]))
# y_v.append(1)
# # Generating negative samples (data augmentation)
#
# X_v.append(np.hstack([emb[0], emb[2], emb[1]]))
# y_v.append(0)
for t in test_triplets:
X_t.append(t)
X_t.append(t[::-1])
# emb = [a for a in t.split()]
# X_t.append(np.hstack([emb[0], emb[1], emb[2]]))
# y_t.append(1)
# # Generating negative samples (data augmentation)
#
# X_t.append(np.hstack([emb[0], emb[2], emb[1]]))
# y_t.append(0)
return set(X), set(X_v), set(X_t)
X = np.vstack(X)
y = np.hstack(y)
if train:
X_v = np.vstack(X_v)
y_v = np.hstack(y_v)
X_t = np.vstack(X_t)
y_t = np.hstack(y_t)
return X, y, X_v, y_v, X_t, y_t
else:
return X, y
# Hint: adjust batch_size and num_workers to your PC configuration, so that you don't run out of memory
def create_loader_from_np(X, y=None, train=True, batch_size=64, shuffle=True, num_workers=2):
"""
Create a torch.utils.data.DataLoader object from numpy arrays containing the data.
input: X: numpy array, the features
y: numpy array, the labels
output: loader: torch.data.util.DataLoader, the object containing the data
"""
if train:
dataset = TensorDataset(torch.from_numpy(X).type(torch.float),
torch.from_numpy(y).type(torch.long))
else:
dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
loader = DataLoader(dataset=dataset,
batch_size=batch_size,
shuffle=shuffle,
pin_memory=True, num_workers=num_workers)
return loader
# TODO: define a model. Here, the basic structure is defined, but you need to fill in the details
class Net(nn.Module):
"""
The model class, which defines our classifier.
"""
def __init__(self, in_features=5376, h1=2048, h2=128, out_features=1, dropout=1):
"""
The constructor of the model.
"""
super().__init__()
self.fc1 = nn.Linear(in_features, h1)
self.out = nn.Linear(h1, out_features)
self.dropout1 = nn.Dropout(0.2 * d)
self.dropout2 = nn.Dropout(0.5 * d)
self.dropout3 = nn.Dropout(0.2 * d)
def forward(self, x):
"""
The forward pass of the model.
input: x: torch.Tensor, the input to the model
output: x: torch.Tensor, the output of the model
"""
x = self.dropout1(x)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.out(x)
x = F.sigmoid(x)
return x
def train_model(train_loader, val_loader, fval_loader, alpha, wd, d):
"""
The training procedure of the model; it accepts the training data, defines the model
and then trains it.
input: train_loader: torch.data.util.DataLoader, the object containing the training data
output: model: torch.nn.Module, the trained model
"""
model = Net(dropout=d)
model.train()
model.to(device)
n_epochs = 100
criterion = nn.BCELoss() # we use cross etropy loss
# optimizer = torch.optim.Adam(model.parameters(),lr=0.001, weight_decay=0.00001)
optimizer = torch.optim.SGD(model.parameters(), lr=alpha, momentum=0.9, weight_decay=wd)
train_loss = []
val_loss = []
for epoch in range(n_epochs):
T_loss = [] # train loss list over epoch
V_loss = [] # validation loss over epoch
# i=0
for [X, y] in train_loader:
model.train()
y_pred = torch.flatten(model.forward(X.to(device)))
# if i==0:
# print(y_pred[:50])
# print(y[:50])
# i=1
loss = criterion(y_pred, y.float().to(device))
T_loss.append(loss.item())
optimizer.zero_grad()
loss.backward()
optimizer.step()
for [X, y] in val_loader:
model.eval()
y_pred = torch.flatten(model.forward(X.to(device)))
lossv = criterion(y_pred, y.float().to(device))
V_loss.append(lossv.item())
T_loss = np.mean(T_loss)
V_loss = np.mean(V_loss)
print(f'Epoch {epoch} and train_loss is: {T_loss}')
print(f'Epoch {epoch} and val_loss is: {V_loss}')
if epoch % 5 == 4:
val_model(model, fval_loader)
train_loss.append(T_loss)
val_loss.append(V_loss)
print(train_loss)
print(val_loss)
return model
def val_model(model, loader):
"""
The testing procedure of the model; it accepts the testing data and the trained model and
then tests the model on it.
input: model: torch.nn.Module, the trained model
loader: torch.data.util.DataLoader, the object containing the testing data
output: None, the function saves the predictions to a results.txt file
"""
model.eval()
predictions = []
y = []
# Iterate over the test data
with torch.no_grad(): # We don't need to compute gradients for testing
for [x_batch, y_batch] in loader:
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
predicted = model(x_batch)
predicted = predicted.cpu().numpy()
y_batch = y_batch.cpu().numpy()
# Rounding the predictions to 0 or 1
predicted[predicted >= 0.5] = 1
predicted[predicted < 0.5] = 0
predictions.append(predicted)
y_batch = np.reshape(y_batch, (len(y_batch), 1))
y.append(y_batch)
# print(y_batch.shape, predicted.shape)
predictions = np.vstack(predictions)
y = np.vstack(y)
acc = np.sum(y == predictions) / len(predictions)
print(f'Final accuracy calculated for the validation data is: {acc}')
def test_model(model, loader, alpha, wd, dropout):
"""
The testing procedure of the model; it accepts the testing data and the trained model and
then tests the model on it.
input: model: torch.nn.Module, the trained model
loader: torch.data.util.DataLoader, the object containing the testing data
output: None, the function saves the predictions to a results.txt file
"""
model.eval()
predictions = []
# Iterate over the test data
with torch.no_grad(): # We don't need to compute gradients for testing
for [x_batch] in loader:
x_batch = x_batch.to(device)
predicted = model(x_batch)
predicted = predicted.cpu().numpy()
# Rounding the predictions to 0 or 1
predicted[predicted >= 0.5] = 1
predicted[predicted < 0.5] = 0
predictions.append(predicted)
predictions = np.vstack(predictions)
np.savetxt(f"results{alpha},{wd},{dropout}.txt", predictions, fmt='%i')
if __name__ == '__main__':
TRAIN_TRIPLETS = 'train_triplets.txt'
TEST_TRIPLETS = 'test_triplets.txt'
# generate embedding for each image in the dataset
if (os.path.exists('dataset/embeddings.npy') == False):
generate_embeddings()
train, val, test = get_data(TRAIN_TRIPLETS)
for filename in train:
if filename in val or filename in test:
print(f'triplet {filename} in val or test')
break
if filename in train:
print('yes filename in train')
break
for filename in val:
if filename in train or filename in test:
print('yay')
break
print('hello')
# # load the training and testing data
# X, y, X_v, y_v, X_t, y_t = get_data(TRAIN_TRIPLETS)
# X_test, _ = get_data(TEST_TRIPLETS, train=False)
#
# # Create data loaders for the training and testing data
# train_loader = create_loader_from_np(X, y, train=True, batch_size=64)
# val_loader = create_loader_from_np(X_v, y_v, train=True, batch_size=128)
# fval_loader = create_loader_from_np(X_t, y_t, train=True, batch_size=128)
#
# test_loader = create_loader_from_np(X_test, train=False, batch_size=2048, shuffle=False)
#
# # # define a model and train it
# # model = train_model(train_loader, val_loader, fval_loader)
# # val_model(model, fval_loader) #final validation of the accuracy on a 10% test set we split in the beginning
#
# # # test the model on the test data
# # test_model(model, test_loader)
# # print("Results saved to results.txt")
#
# for alpha in [0.001]:
# for wd in [0.0001, 0.001]:
# for d in [0.5, 1, 1.5]:
# print('hyperparameters:')
# print(alpha, wd, d)
# model = train_model(train_loader, val_loader, fval_loader, alpha, wd, d)
# val_model(model, fval_loader)
# # test the model on the test data
# test_model(model, test_loader, alpha, wd, d)
# print("Results saved to results.txt")
#