Tumor_segmentation

mail@pastecode.io avatar
unknown
python
7 months ago
11 kB
2
Indexable
Never
import os
import numpy as np
import cv2
import tensorflow as tf
from tensorflow import keras
from keras import layers, models, callbacks
import matplotlib.pyplot as plt
from PIL import Image, ImageFilter
import sklearn
from sklearn.model_selection import train_test_split
import keras.backend as K
import base64 
import matplotlib.patches as mpatches
#%%
#Utility functions"
def preprocessing_function(edge_enhance, equalization, blur, image):

    # apply gaussian blur to the image
    if blur != 0:
      image = cv2.GaussianBlur(image, (blur, blur), 0 )
    # edge enhance using PIL
    if edge_enhance:
        image = Image.fromarray(image)
        image = image.filter(ImageFilter.EDGE_ENHANCE)
        image = np.array(image)
    else:
        image = image

    # equalization
    if equalization == 'histogram':
        image = cv2.equalizeHist(image)
    elif equalization == 'adaptive':
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
        image = clahe.apply(image)
    else:
        image = image

    # Flip
    image = image
    image1 = cv2.flip(image, 1)

    return image, image1

import tensorflow as tf

def load_and_preprocess(image_path, label_path, target_size=224):
    # Load the image and label
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=1)
    label = tf.io.read_file(label_path)
    label = tf.image.decode_png(label, channels=1)

    # Convert both image and label to float32
    image = tf.image.convert_image_dtype(image, tf.float32)
    label = tf.image.convert_image_dtype(label, tf.float32)

    # Resize the image and label to maintain aspect ratio
    shape = tf.shape(image)[:2]
    ratio = tf.cast(target_size, tf.float32) / tf.cast(tf.reduce_max(shape), tf.float32)
    new_shape = tf.cast(tf.round(tf.cast(shape, tf.float32) * ratio), tf.int32)
    
    # Use bicubic interpolation for resizing
    image = tf.image.resize(image, new_shape, method=tf.image.ResizeMethod.BICUBIC)
    label = tf.image.resize(label, new_shape, method=tf.image.ResizeMethod.BICUBIC)

    # Pad the image and label to the target size
    pad_height = target_size - new_shape[0]
    pad_width = target_size - new_shape[1]
    image = tf.image.pad_to_bounding_box(image, pad_height // 2, pad_width // 2, target_size, target_size)
    label = tf.image.pad_to_bounding_box(label, pad_height // 2, pad_width // 2, target_size, target_size)

    # Round the label to 0 or 1 after resizing and padding
    label = tf.round(label)
    return image, label
#%%
# Configuration Settings
base_path = "/mnt/c/Users/andre/OneDrive/Documents/tumor_segmentation_data"
data_dir = os.path.join(base_path, "patients/imgs/")
target_path = os.path.join(base_path, "aug/patient/")
labels_original = os.path.join(base_path, "patients/labels/")  # path to the original dataset
augmented_labels = os.path.join(base_path, "aug/labels/")    # path to where the preprocessed dataset will be stored
preprocessing_args = {'edge_enhance': True, 'equalization': 'adaptive', 'blur': 5}
size = None
input_size = (size, size, 1)
batch_size = 2
num_classes = 1
#%%
# Creating directories using the base path
try:
    os.mkdir(os.path.join(base_path, 'aug'))
    os.mkdir(os.path.join(base_path, 'aug', 'patient'))
    os.mkdir(os.path.join(base_path, 'aug', 'labels'))
except:
    print("Directories already exist")
#%%
# Define preprocessing arguments
preprocessing_args = {
    'edge_enhance': True,
    'equalization': 'adaptive',
    'blur': 5
}

errors = []  # stores any errors that occur (used in testing)
shape = []  # stores the shape of the final images to ensure they have the same aspect ratio

# Iterate through the images in the original dataset directory
for image in os.listdir(data_dir):
    img = cv2.imread(os.path.join(data_dir, image), cv2.IMREAD_GRAYSCALE)
    
    # Apply preprocessing function
    temp, temp1 = preprocessing_function(
        preprocessing_args['edge_enhance'],
        preprocessing_args['equalization'],
        preprocessing_args['blur'],
        img
    )

    # Define paths for the processed images
    path = os.path.join(target_path, image[:-4] + '_0.png')
    path1 = os.path.join(target_path, image[:-4] + '_1.png')

    # Save the processed images
    cv2.imwrite(path, temp)
    cv2.imwrite(path1, temp1)
#%%
# equalization: histogram or adaptive
preprocessing_args = {
    'edge_enhance': False,
    'equalization': 'False',
    'blur': 0
}
errors = []   # stores any errors that occur (used in testing)
shape = []    # stores the shape of the final images to ensure that they have the same aspect ratio
for image in os.listdir(labels_original):
    img = cv2.imread(labels_original + image, cv2.COLOR_BGR2GRAY)
    temp, temp1 = preprocessing_function(preprocessing_args['edge_enhance'],
                                  preprocessing_args['equalization'],
                                  preprocessing_args['blur'],
                                  img)

    # add a _i to the image name
    path = augmented_labels + image[:-4] + '_0.png'
    cv2.imwrite(path, temp)
    path1 = augmented_labels + image[:-4] + '_1.png'
    cv2.imwrite(path1, temp1)
#%% md
## Preprocessing Testing
#%% md
This is the code that was used to test the different preprocessing processes.
#%%
from keras.callbacks import ModelCheckpoint, EarlyStopping
#%%
# Get the list of image file paths and label file paths, sorted
image_paths = sorted([os.path.join(target_path, file) for file in os.listdir(target_path) if file.endswith(('.png', '.jpg'))])
label_paths = sorted([os.path.join(augmented_labels, file) for file in os.listdir(augmented_labels) if file.endswith(('.png', '.jpg'))])

# Split the dataset into training and validation sets
train_image_paths, val_image_paths, train_label_paths, val_label_paths = train_test_split(image_paths, label_paths, test_size=0.2, random_state=42)

# Create TensorFlow datasets
train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_label_paths))
train_dataset = train_dataset.map(load_and_preprocess)
train_dataset = train_dataset.batch(batch_size)

val_dataset = tf.data.Dataset.from_tensor_slices((val_image_paths, val_label_paths))
val_dataset = val_dataset.map(load_and_preprocess)
val_dataset = val_dataset.batch(batch_size)

# Optionally, you can prefetch and cache the datasets for better performance
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.prefetch(tf.data.experimental.AUTOTUNE)
#%%
if tf.config.list_physical_devices('GPU'):
  print("TensorFlow **IS** using the GPU")
else:
  print("TensorFlow **IS NOT** using the GPU")
#%%
callbacks = [ModelCheckpoint('cohorts.hdf5', verbose=1, save_best_only=True, save_weights_only=True,
                             monitor = 'val_loss'),
             EarlyStopping(monitor = 'val_loss', patience = 16, restore_best_weights = True)]
#%%
# Set NAdam optimizer with learning rate 0.001
nadam_optimizer = tf.keras.optimizers.Nadam(learning_rate=0.001)
#%%
class ResizeLayer(layers.Layer):
    def call(self, inputs, original_input):
        # Resize 'inputs' to match the shape of 'original_input'
        return tf.image.resize(inputs, tf.shape(original_input)[1:3])

def unet_model(target_size):
    original_input = layers.Input(shape=(None, None, 1))
    resized_inputs = tf.image.resize(original_input, target_size)

    # Encoder
    conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(resized_inputs)
    conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)

    # Midpoint
    conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(conv3)

    # Decoder
    up1 = layers.UpSampling2D(size=(2, 2))(conv3)
    up1 = layers.Conv2D(128, 2, activation='relu', padding='same')(up1)
    merge1 = layers.concatenate([conv2, up1], axis=3)
    conv4 = layers.Conv2D(128, 3, activation='relu', padding='same')(merge1)
    conv4 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv4)

    up2 = layers.UpSampling2D(size=(2, 2))(conv4)
    up2 = layers.Conv2D(64, 2, activation='relu', padding='same')(up2)
    merge2 = layers.concatenate([conv1, up2], axis=3)
    conv5 = layers.Conv2D(64, 3, activation='relu', padding='same')(merge2)
    conv5 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv5)

    # Output layer
    outputs = layers.Conv2D(1, 1, activation='sigmoid')(conv5)
    resized_outputs = ResizeLayer()(outputs, original_input)

    model = models.Model(inputs=original_input, outputs=resized_outputs)
    return model
#%%
 # Assuming grayscale images
target_size = (224, 224)
model = unet_model(target_size)
#%%
# Combined Generalized Dice Loss and Focal Loss
def GDL_add_focal(gamma=2., alpha=0.25):
    def focal_loss_fixed(y_true, y_pred):
        pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred))
        pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred))
        return -K.mean(alpha * K.pow(1. - pt_1, gamma) * K.log(pt_1 + K.epsilon())) - K.mean(
            (1 - alpha) * K.pow(pt_0, gamma) * K.log(1. - pt_0 + K.epsilon()))

    def generalized_dice_coeff(y_true, y_pred):
        # Calculate sum over the required number of dimensions
        w = K.sum(y_true, axis=(0, 1, 2))
        w = 1 / (w ** 2 + 0.000001)
    
        numerator = y_true * y_pred
        # Reduce sum to the number of dimensions in the 'numerator' tensor
        numerator = w * K.sum(numerator, (0, 1, 2))
        numerator = K.sum(numerator)
    
        denominator = y_true + y_pred
        # Same for denominator
        denominator = w * K.sum(denominator, (0, 1, 2))
        denominator = K.sum(denominator)
    
        gen_dice_coef = 2 * numerator / denominator
        return gen_dice_coef

    def generalized_dice_loss(y_true, y_pred):
        return 1 - generalized_dice_coeff(y_true, y_pred) + 10 * focal_loss_fixed(y_true, y_pred)
    
    return generalized_dice_loss
#%%
def dice_coef_metric(y_true, y_pred):
    intersection = 2.0 * K.sum(y_true * y_pred)
    union = K.sum(y_true) + K.sum(y_pred)
    return intersection / (union + K.epsilon())
#%%
# Compile the model with the specified optimizer, loss, and metric
model.compile(optimizer=nadam_optimizer, loss=GDL_add_focal(gamma=2., alpha=0.25), metrics=[dice_coef_metric])
model.summary()
Leave a Comment