Untitled

mail@pastecode.io avatar
unknown
python
2 years ago
10 kB
8
Indexable
Never
import os

import cv2
import keras.backend as backend
import matplotlib.pyplot as plt
import numpy as np
from keras import Sequential
from keras.layers import Conv2D, Flatten
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import GlobalAveragePooling2D
from keras.layers import Input
from keras.layers import Lambda
from keras.layers import MaxPooling2D
from keras.models import Model
import re
from itertools import combinations

from keras.optimizers import Adam
from keras.regularizers import l2

SHAPE = (280, 320, 1)
BATCH_SIZE = 32
EPOCHS = 30
DIRECTORY = "nowy_sposb"
MODEL_PATH = os.path.sep.join([DIRECTORY, "siamese_model"])
PLOT_PATH = os.path.sep.join([DIRECTORY, "plot.png"])


# def build_siamese_model(inputShape):
#     inputs = Input(inputShape)
#     x = Conv2D(64, (2, 2), padding="same", activation="relu")(inputs)
#     x = MaxPooling2D(pool_size=(2, 2))(x)
#     x = Dropout(0.3)(x)
#     x = Conv2D(64, (2, 2), padding="same", activation="relu")(x)
#     x = MaxPooling2D(pool_size=2)(x)
#     x = Dropout(0.3)(x)
#
#     pooledOutput = GlobalAveragePooling2D()(x)
#     outputs = Dense(48)(pooledOutput)
#
#     model = Model(inputs, outputs)
#
#     return model

def build_third_model(inputShape):
    inputs = Input(inputShape)
    x = Conv2D(64, (4, 4), padding="same", activation="relu")(inputs)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    # x = Dropout(0.3)(x)
    x = Conv2D(128, (4, 4), padding="same", activation="relu")(x)
    x = MaxPooling2D(pool_size=2)(x)
    x = Dropout(0.3)(x)
    x = Conv2D(128, (4, 4), padding="same", activation="relu")(x)
    x = MaxPooling2D(pool_size=2)(x)
    x = Dropout(0.3)(x)
    x = Conv2D(256, (4, 4), padding="same", activation="relu")(x)
    x = Dropout(0.3)(x)
    # prepare the final outputs
    pooledOutput = GlobalAveragePooling2D()(x)
    outputs = Dense(48)(pooledOutput)
    # build the model
    model = Model(inputs, outputs)
    # return the model to the calling function
    return model


# def initialize_bias(shape, dtype=None):
#     """
#         The paper, http://www.cs.utoronto.ca/~gkoch/files/msc-thesis.pdf
#         suggests to initialize CNN layer bias with mean as 0.5 and standard deviation of 0.01
#     """
#     return np.random.normal(loc=0.5, scale=1e-2, size=shape)
#
#
# def initialize_weights(shape, dtype=None):
#     """
#         The paper, http://www.cs.utoronto.ca/~gkoch/files/msc-thesis.pdf
#         suggests to initialize CNN layer weights with mean as 0.0 and standard deviation of 0.01
#     """
#     return np.random.normal(loc=0.0, scale=1e-2, size=shape)
#
#
# bias = initialize_bias((1000, 1))
# weights = initialize_weights((1000, 1))


# def build_another_model(inputShape):
#     left_input = Input(inputShape)
#     right_input = Input(inputShape)
#     model = Sequential()
#     model.add(Conv2D(64, (10, 10), activation='relu', input_shape=inputShape,
#                      kernel_initializer=initialize_weights, kernel_regularizer=l2(2e-4)))
#     model.add(MaxPooling2D())
#     model.add(Conv2D(128, (7, 7), activation='relu',
#                      kernel_initializer=initialize_weights,
#                      bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
#     model.add(MaxPooling2D())
#     model.add(Conv2D(128, (4, 4), activation='relu', kernel_initializer=initialize_weights,
#                      bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
#     model.add(MaxPooling2D())
#     model.add(Conv2D(256, (4, 4), activation='relu', kernel_initializer=initialize_weights,
#                      bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
#     model.add(Flatten())
#     model.add(Dense(4096, activation='sigmoid',
#                     kernel_regularizer=l2(1e-3),
#                     kernel_initializer=initialize_weights, bias_initializer=initialize_bias))
#     encoded_l = model(left_input)
#     encoded_r = model(right_input)
#     L1_layer = Lambda(lambda tensors: backend.abs(tensors[0] - tensors[1]))
#     L1_distance = L1_layer([encoded_l, encoded_r])
#     prediction = Dense(1, activation='sigmoid', bias_initializer=initialize_bias)(L1_distance)
#     siamese_net = Model(inputs=[left_input, right_input], outputs=prediction)
#     return siamese_net


def create_positive_negative_pairs(images, labels):
    pair_images = []
    pair_labels = []
    idx = [np.where(labels == i)[0] for i in range(0, len(labels))]
    for i in range(len(images)):
        currentImage = images[i]
        label = labels[i]

        chooseFrom = np.delete(idx[label], np.argwhere(idx[label] == i))
        idxB = np.random.choice(chooseFrom)
        posImage = images[idxB]

        pair_images.append([currentImage, posImage])
        pair_labels.append([1])

        negIdx = np.where(labels != label)[0]
        negImage = images[np.random.choice(negIdx)]

        pair_images.append([currentImage, negImage])
        pair_labels.append([0])

    return np.array(pair_images), np.array(pair_labels)


def create_positive_negative_pairs_new(images, labels, test=False):
    pair_images = []
    pair_labels = []
    idx = [np.where(labels == i)[0] for i in range(0, len(labels))]
    for i in idx:
        comb = [comb for comb in combinations(i, 2)]
        for single_combination in comb:
            currentImage = images[single_combination[0]]
            second_Image = images[single_combination[1]]
            pair_images.append([currentImage, second_Image])
            pair_labels.append([1])
    for i in range(len(images)):
        currentImage = images[i]
        label = labels[i]
        negIdx = np.where(labels != label)[0]
        neg_amount = 8
        for _ in range(neg_amount):
            negImage = images[np.random.choice(negIdx)]
            pair_images.append([currentImage, negImage])
            pair_labels.append([0])

    return np.array(pair_images), np.array(pair_labels)




def list_files(dir):
    trainX = []
    trainY = []
    testX = []
    testY = []
    i = 0
    indexes = []
    for root, dirs, files in os.walk(dir):
        if root.endswith("/L") and len(files) >= 7:
            files = sorted(files)
            label_number = i
            trainY += 5 * [label_number]
            trainX += [cv2.imread(f"{root}/{single_image}", cv2.IMREAD_GRAYSCALE) for single_image in files[0:5]]
            testY.append(label_number)
            testX.append(cv2.imread(f"{root}/{files[5]}", cv2.IMREAD_GRAYSCALE))
            testY.append(label_number)
            testX.append(cv2.imread(f"{root}/{files[6]}", cv2.IMREAD_GRAYSCALE))

            # print(f"root: {root}, dirs: {dirs}, files: {files}")
            i = i + 1
            indexes.append(re.search("l/(.*)/L", root).group(1))
        if i == 100:
            print(f"indexes {indexes}")
            break
    return (np.array(trainX), np.array(trainY)), (np.array(testX), np.array(testY))


def list_files_only_calc(dir):
    lens = []
    i = 0
    indexes = []
    for root, dirs, files in os.walk(dir):
        if root.endswith("/R") and len(files) >= 1:
            lens.append(len(files))
    lens = sorted(lens)
    return 2




def list_files_one_set(dir):
    trainX = []
    trainY = []
    i = 0
    indexes = []
    for root, dirs, files in os.walk(dir):
        if root.endswith("/L") and len(files) >= 7:
            files = sorted(files)
            label_number = i
            trainY += 6 * [label_number]
            trainX += [cv2.imread(f"{root}/{single_image}", cv2.IMREAD_GRAYSCALE) for single_image in files[0:6]]
            # print(f"root: {root}, dirs: {dirs}, files: {files}")
            i = i + 1
            indexes.append(re.search("l/(.*)/L", root).group(1))
        if i == 100:
            print(f"indexes {indexes}")
            break
    return np.array(trainX), np.array(trainY)


def euclidean_distance(vectors):
    (featsA, featsB) = vectors
    sumSquared = backend.sum(backend.square(featsA - featsB), axis=1,
                             keepdims=True)
    return backend.sqrt(backend.maximum(sumSquared, backend.epsilon()))


def plot_training(H, plotPath):
    plt.style.use("ggplot")
    plt.figure()
    plt.plot(H.history["loss"], label="train_loss")
    plt.plot(H.history["val_loss"], label="val_loss")
    plt.plot(H.history["accuracy"], label="train_acc")
    plt.plot(H.history["val_accuracy"], label="val_acc")
    plt.title("Training Loss and Accuracy")
    plt.xlabel("Epoch #")
    plt.ylabel("Loss/Accuracy")
    plt.legend(loc="lower left")
    plt.savefig(plotPath)


(trainX, trainY), (testX, testY) = list_files("/Users/piotrmucha/Documents/CASIA-Iris-Interval")
# list_files_only_calc("/Users/piotrmucha/Documents/CASIA-Iris-Interval")
# (trainX, trainY) = list_files_one_set("/Users/piotrmucha/Documents/CASIA-Iris-Interval")

trainX = trainX / 255.0
testX = testX / 255.0

trainX = np.expand_dims(trainX, axis=-1)
testX = np.expand_dims(testX, axis=-1)

(pairTrain, labelTrain) = create_positive_negative_pairs_new(trainX, trainY)
(pairTest, labelTest) = create_positive_negative_pairs_new(testX, testY, True)
# pairTrain, labelTrain, pairTest, labelTest = create_positive_negative_pairs_whole(trainX, trainY)

imgA = Input(shape=SHAPE)
imgB = Input(shape=SHAPE)
siamese_network = build_third_model(SHAPE)
# model = build_another_model(SHAPE)
featsA = siamese_network(imgA)
featsB = siamese_network(imgB)
distance = Lambda(euclidean_distance)([featsA, featsB])
outputs = Dense(1, activation="sigmoid")(distance)

model = Model(inputs=[imgA, imgB], outputs=outputs)
# model = Model(inputs=[imgA, imgB], outputs=siamese_network)

model.compile(loss="binary_crossentropy", optimizer="adam",
              metrics=["accuracy"])

# optimizer = Adam(lr = 0.00006)
# model.compile(loss="binary_crossentropy",optimizer=optimizer)

history = model.fit(
    [pairTrain[:, 0], pairTrain[:, 1]], labelTrain[:],
    validation_data=([pairTest[:, 0], pairTest[:, 1]], labelTest[:]),
    batch_size=BATCH_SIZE,
    epochs=EPOCHS)

model.save(MODEL_PATH)
plot_training(history, PLOT_PATH)