Tumor_segmentation
unknown
python
2 years ago
11 kB
5
Indexable
import os import numpy as np import cv2 import tensorflow as tf from tensorflow import keras from keras import layers, models, callbacks import matplotlib.pyplot as plt from PIL import Image, ImageFilter import sklearn from sklearn.model_selection import train_test_split import keras.backend as K import base64 import matplotlib.patches as mpatches #%% #Utility functions" def preprocessing_function(edge_enhance, equalization, blur, image): # apply gaussian blur to the image if blur != 0: image = cv2.GaussianBlur(image, (blur, blur), 0 ) # edge enhance using PIL if edge_enhance: image = Image.fromarray(image) image = image.filter(ImageFilter.EDGE_ENHANCE) image = np.array(image) else: image = image # equalization if equalization == 'histogram': image = cv2.equalizeHist(image) elif equalization == 'adaptive': clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) image = clahe.apply(image) else: image = image # Flip image = image image1 = cv2.flip(image, 1) return image, image1 import tensorflow as tf def load_and_preprocess(image_path, label_path, target_size=224): # Load the image and label image = tf.io.read_file(image_path) image = tf.image.decode_png(image, channels=1) label = tf.io.read_file(label_path) label = tf.image.decode_png(label, channels=1) # Convert both image and label to float32 image = tf.image.convert_image_dtype(image, tf.float32) label = tf.image.convert_image_dtype(label, tf.float32) # Resize the image and label to maintain aspect ratio shape = tf.shape(image)[:2] ratio = tf.cast(target_size, tf.float32) / tf.cast(tf.reduce_max(shape), tf.float32) new_shape = tf.cast(tf.round(tf.cast(shape, tf.float32) * ratio), tf.int32) # Use bicubic interpolation for resizing image = tf.image.resize(image, new_shape, method=tf.image.ResizeMethod.BICUBIC) label = tf.image.resize(label, new_shape, method=tf.image.ResizeMethod.BICUBIC) # Pad the image and label to the target size pad_height = target_size - new_shape[0] pad_width = target_size - new_shape[1] image = tf.image.pad_to_bounding_box(image, pad_height // 2, pad_width // 2, target_size, target_size) label = tf.image.pad_to_bounding_box(label, pad_height // 2, pad_width // 2, target_size, target_size) # Round the label to 0 or 1 after resizing and padding label = tf.round(label) return image, label #%% # Configuration Settings base_path = "/mnt/c/Users/andre/OneDrive/Documents/tumor_segmentation_data" data_dir = os.path.join(base_path, "patients/imgs/") target_path = os.path.join(base_path, "aug/patient/") labels_original = os.path.join(base_path, "patients/labels/") # path to the original dataset augmented_labels = os.path.join(base_path, "aug/labels/") # path to where the preprocessed dataset will be stored preprocessing_args = {'edge_enhance': True, 'equalization': 'adaptive', 'blur': 5} size = None input_size = (size, size, 1) batch_size = 2 num_classes = 1 #%% # Creating directories using the base path try: os.mkdir(os.path.join(base_path, 'aug')) os.mkdir(os.path.join(base_path, 'aug', 'patient')) os.mkdir(os.path.join(base_path, 'aug', 'labels')) except: print("Directories already exist") #%% # Define preprocessing arguments preprocessing_args = { 'edge_enhance': True, 'equalization': 'adaptive', 'blur': 5 } errors = [] # stores any errors that occur (used in testing) shape = [] # stores the shape of the final images to ensure they have the same aspect ratio # Iterate through the images in the original dataset directory for image in os.listdir(data_dir): img = cv2.imread(os.path.join(data_dir, image), cv2.IMREAD_GRAYSCALE) # Apply preprocessing function temp, temp1 = preprocessing_function( preprocessing_args['edge_enhance'], preprocessing_args['equalization'], preprocessing_args['blur'], img ) # Define paths for the processed images path = os.path.join(target_path, image[:-4] + '_0.png') path1 = os.path.join(target_path, image[:-4] + '_1.png') # Save the processed images cv2.imwrite(path, temp) cv2.imwrite(path1, temp1) #%% # equalization: histogram or adaptive preprocessing_args = { 'edge_enhance': False, 'equalization': 'False', 'blur': 0 } errors = [] # stores any errors that occur (used in testing) shape = [] # stores the shape of the final images to ensure that they have the same aspect ratio for image in os.listdir(labels_original): img = cv2.imread(labels_original + image, cv2.COLOR_BGR2GRAY) temp, temp1 = preprocessing_function(preprocessing_args['edge_enhance'], preprocessing_args['equalization'], preprocessing_args['blur'], img) # add a _i to the image name path = augmented_labels + image[:-4] + '_0.png' cv2.imwrite(path, temp) path1 = augmented_labels + image[:-4] + '_1.png' cv2.imwrite(path1, temp1) #%% md ## Preprocessing Testing #%% md This is the code that was used to test the different preprocessing processes. #%% from keras.callbacks import ModelCheckpoint, EarlyStopping #%% # Get the list of image file paths and label file paths, sorted image_paths = sorted([os.path.join(target_path, file) for file in os.listdir(target_path) if file.endswith(('.png', '.jpg'))]) label_paths = sorted([os.path.join(augmented_labels, file) for file in os.listdir(augmented_labels) if file.endswith(('.png', '.jpg'))]) # Split the dataset into training and validation sets train_image_paths, val_image_paths, train_label_paths, val_label_paths = train_test_split(image_paths, label_paths, test_size=0.2, random_state=42) # Create TensorFlow datasets train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_label_paths)) train_dataset = train_dataset.map(load_and_preprocess) train_dataset = train_dataset.batch(batch_size) val_dataset = tf.data.Dataset.from_tensor_slices((val_image_paths, val_label_paths)) val_dataset = val_dataset.map(load_and_preprocess) val_dataset = val_dataset.batch(batch_size) # Optionally, you can prefetch and cache the datasets for better performance train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE) val_dataset = val_dataset.prefetch(tf.data.experimental.AUTOTUNE) #%% if tf.config.list_physical_devices('GPU'): print("TensorFlow **IS** using the GPU") else: print("TensorFlow **IS NOT** using the GPU") #%% callbacks = [ModelCheckpoint('cohorts.hdf5', verbose=1, save_best_only=True, save_weights_only=True, monitor = 'val_loss'), EarlyStopping(monitor = 'val_loss', patience = 16, restore_best_weights = True)] #%% # Set NAdam optimizer with learning rate 0.001 nadam_optimizer = tf.keras.optimizers.Nadam(learning_rate=0.001) #%% class ResizeLayer(layers.Layer): def call(self, inputs, original_input): # Resize 'inputs' to match the shape of 'original_input' return tf.image.resize(inputs, tf.shape(original_input)[1:3]) def unet_model(target_size): original_input = layers.Input(shape=(None, None, 1)) resized_inputs = tf.image.resize(original_input, target_size) # Encoder conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(resized_inputs) conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv1) pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1) conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(pool1) conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv2) pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2) # Midpoint conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(pool2) conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(conv3) # Decoder up1 = layers.UpSampling2D(size=(2, 2))(conv3) up1 = layers.Conv2D(128, 2, activation='relu', padding='same')(up1) merge1 = layers.concatenate([conv2, up1], axis=3) conv4 = layers.Conv2D(128, 3, activation='relu', padding='same')(merge1) conv4 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv4) up2 = layers.UpSampling2D(size=(2, 2))(conv4) up2 = layers.Conv2D(64, 2, activation='relu', padding='same')(up2) merge2 = layers.concatenate([conv1, up2], axis=3) conv5 = layers.Conv2D(64, 3, activation='relu', padding='same')(merge2) conv5 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv5) # Output layer outputs = layers.Conv2D(1, 1, activation='sigmoid')(conv5) resized_outputs = ResizeLayer()(outputs, original_input) model = models.Model(inputs=original_input, outputs=resized_outputs) return model #%% # Assuming grayscale images target_size = (224, 224) model = unet_model(target_size) #%% # Combined Generalized Dice Loss and Focal Loss def GDL_add_focal(gamma=2., alpha=0.25): def focal_loss_fixed(y_true, y_pred): pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred)) pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred)) return -K.mean(alpha * K.pow(1. - pt_1, gamma) * K.log(pt_1 + K.epsilon())) - K.mean( (1 - alpha) * K.pow(pt_0, gamma) * K.log(1. - pt_0 + K.epsilon())) def generalized_dice_coeff(y_true, y_pred): # Calculate sum over the required number of dimensions w = K.sum(y_true, axis=(0, 1, 2)) w = 1 / (w ** 2 + 0.000001) numerator = y_true * y_pred # Reduce sum to the number of dimensions in the 'numerator' tensor numerator = w * K.sum(numerator, (0, 1, 2)) numerator = K.sum(numerator) denominator = y_true + y_pred # Same for denominator denominator = w * K.sum(denominator, (0, 1, 2)) denominator = K.sum(denominator) gen_dice_coef = 2 * numerator / denominator return gen_dice_coef def generalized_dice_loss(y_true, y_pred): return 1 - generalized_dice_coeff(y_true, y_pred) + 10 * focal_loss_fixed(y_true, y_pred) return generalized_dice_loss #%% def dice_coef_metric(y_true, y_pred): intersection = 2.0 * K.sum(y_true * y_pred) union = K.sum(y_true) + K.sum(y_pred) return intersection / (union + K.epsilon()) #%% # Compile the model with the specified optimizer, loss, and metric model.compile(optimizer=nadam_optimizer, loss=GDL_add_focal(gamma=2., alpha=0.25), metrics=[dice_coef_metric]) model.summary()
Editor is loading...
Leave a Comment