Tumor_segmentation
unknown
python
2 years ago
11 kB
12
Indexable
import os
import numpy as np
import cv2
import tensorflow as tf
from tensorflow import keras
from keras import layers, models, callbacks
import matplotlib.pyplot as plt
from PIL import Image, ImageFilter
import sklearn
from sklearn.model_selection import train_test_split
import keras.backend as K
import base64
import matplotlib.patches as mpatches
#%%
#Utility functions"
def preprocessing_function(edge_enhance, equalization, blur, image):
# apply gaussian blur to the image
if blur != 0:
image = cv2.GaussianBlur(image, (blur, blur), 0 )
# edge enhance using PIL
if edge_enhance:
image = Image.fromarray(image)
image = image.filter(ImageFilter.EDGE_ENHANCE)
image = np.array(image)
else:
image = image
# equalization
if equalization == 'histogram':
image = cv2.equalizeHist(image)
elif equalization == 'adaptive':
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
image = clahe.apply(image)
else:
image = image
# Flip
image = image
image1 = cv2.flip(image, 1)
return image, image1
import tensorflow as tf
def load_and_preprocess(image_path, label_path, target_size=224):
# Load the image and label
image = tf.io.read_file(image_path)
image = tf.image.decode_png(image, channels=1)
label = tf.io.read_file(label_path)
label = tf.image.decode_png(label, channels=1)
# Convert both image and label to float32
image = tf.image.convert_image_dtype(image, tf.float32)
label = tf.image.convert_image_dtype(label, tf.float32)
# Resize the image and label to maintain aspect ratio
shape = tf.shape(image)[:2]
ratio = tf.cast(target_size, tf.float32) / tf.cast(tf.reduce_max(shape), tf.float32)
new_shape = tf.cast(tf.round(tf.cast(shape, tf.float32) * ratio), tf.int32)
# Use bicubic interpolation for resizing
image = tf.image.resize(image, new_shape, method=tf.image.ResizeMethod.BICUBIC)
label = tf.image.resize(label, new_shape, method=tf.image.ResizeMethod.BICUBIC)
# Pad the image and label to the target size
pad_height = target_size - new_shape[0]
pad_width = target_size - new_shape[1]
image = tf.image.pad_to_bounding_box(image, pad_height // 2, pad_width // 2, target_size, target_size)
label = tf.image.pad_to_bounding_box(label, pad_height // 2, pad_width // 2, target_size, target_size)
# Round the label to 0 or 1 after resizing and padding
label = tf.round(label)
return image, label
#%%
# Configuration Settings
base_path = "/mnt/c/Users/andre/OneDrive/Documents/tumor_segmentation_data"
data_dir = os.path.join(base_path, "patients/imgs/")
target_path = os.path.join(base_path, "aug/patient/")
labels_original = os.path.join(base_path, "patients/labels/") # path to the original dataset
augmented_labels = os.path.join(base_path, "aug/labels/") # path to where the preprocessed dataset will be stored
preprocessing_args = {'edge_enhance': True, 'equalization': 'adaptive', 'blur': 5}
size = None
input_size = (size, size, 1)
batch_size = 2
num_classes = 1
#%%
# Creating directories using the base path
try:
os.mkdir(os.path.join(base_path, 'aug'))
os.mkdir(os.path.join(base_path, 'aug', 'patient'))
os.mkdir(os.path.join(base_path, 'aug', 'labels'))
except:
print("Directories already exist")
#%%
# Define preprocessing arguments
preprocessing_args = {
'edge_enhance': True,
'equalization': 'adaptive',
'blur': 5
}
errors = [] # stores any errors that occur (used in testing)
shape = [] # stores the shape of the final images to ensure they have the same aspect ratio
# Iterate through the images in the original dataset directory
for image in os.listdir(data_dir):
img = cv2.imread(os.path.join(data_dir, image), cv2.IMREAD_GRAYSCALE)
# Apply preprocessing function
temp, temp1 = preprocessing_function(
preprocessing_args['edge_enhance'],
preprocessing_args['equalization'],
preprocessing_args['blur'],
img
)
# Define paths for the processed images
path = os.path.join(target_path, image[:-4] + '_0.png')
path1 = os.path.join(target_path, image[:-4] + '_1.png')
# Save the processed images
cv2.imwrite(path, temp)
cv2.imwrite(path1, temp1)
#%%
# equalization: histogram or adaptive
preprocessing_args = {
'edge_enhance': False,
'equalization': 'False',
'blur': 0
}
errors = [] # stores any errors that occur (used in testing)
shape = [] # stores the shape of the final images to ensure that they have the same aspect ratio
for image in os.listdir(labels_original):
img = cv2.imread(labels_original + image, cv2.COLOR_BGR2GRAY)
temp, temp1 = preprocessing_function(preprocessing_args['edge_enhance'],
preprocessing_args['equalization'],
preprocessing_args['blur'],
img)
# add a _i to the image name
path = augmented_labels + image[:-4] + '_0.png'
cv2.imwrite(path, temp)
path1 = augmented_labels + image[:-4] + '_1.png'
cv2.imwrite(path1, temp1)
#%% md
## Preprocessing Testing
#%% md
This is the code that was used to test the different preprocessing processes.
#%%
from keras.callbacks import ModelCheckpoint, EarlyStopping
#%%
# Get the list of image file paths and label file paths, sorted
image_paths = sorted([os.path.join(target_path, file) for file in os.listdir(target_path) if file.endswith(('.png', '.jpg'))])
label_paths = sorted([os.path.join(augmented_labels, file) for file in os.listdir(augmented_labels) if file.endswith(('.png', '.jpg'))])
# Split the dataset into training and validation sets
train_image_paths, val_image_paths, train_label_paths, val_label_paths = train_test_split(image_paths, label_paths, test_size=0.2, random_state=42)
# Create TensorFlow datasets
train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_label_paths))
train_dataset = train_dataset.map(load_and_preprocess)
train_dataset = train_dataset.batch(batch_size)
val_dataset = tf.data.Dataset.from_tensor_slices((val_image_paths, val_label_paths))
val_dataset = val_dataset.map(load_and_preprocess)
val_dataset = val_dataset.batch(batch_size)
# Optionally, you can prefetch and cache the datasets for better performance
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.prefetch(tf.data.experimental.AUTOTUNE)
#%%
if tf.config.list_physical_devices('GPU'):
print("TensorFlow **IS** using the GPU")
else:
print("TensorFlow **IS NOT** using the GPU")
#%%
callbacks = [ModelCheckpoint('cohorts.hdf5', verbose=1, save_best_only=True, save_weights_only=True,
monitor = 'val_loss'),
EarlyStopping(monitor = 'val_loss', patience = 16, restore_best_weights = True)]
#%%
# Set NAdam optimizer with learning rate 0.001
nadam_optimizer = tf.keras.optimizers.Nadam(learning_rate=0.001)
#%%
class ResizeLayer(layers.Layer):
def call(self, inputs, original_input):
# Resize 'inputs' to match the shape of 'original_input'
return tf.image.resize(inputs, tf.shape(original_input)[1:3])
def unet_model(target_size):
original_input = layers.Input(shape=(None, None, 1))
resized_inputs = tf.image.resize(original_input, target_size)
# Encoder
conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(resized_inputs)
conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv1)
pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)
conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(pool1)
conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv2)
pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)
# Midpoint
conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(pool2)
conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(conv3)
# Decoder
up1 = layers.UpSampling2D(size=(2, 2))(conv3)
up1 = layers.Conv2D(128, 2, activation='relu', padding='same')(up1)
merge1 = layers.concatenate([conv2, up1], axis=3)
conv4 = layers.Conv2D(128, 3, activation='relu', padding='same')(merge1)
conv4 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv4)
up2 = layers.UpSampling2D(size=(2, 2))(conv4)
up2 = layers.Conv2D(64, 2, activation='relu', padding='same')(up2)
merge2 = layers.concatenate([conv1, up2], axis=3)
conv5 = layers.Conv2D(64, 3, activation='relu', padding='same')(merge2)
conv5 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv5)
# Output layer
outputs = layers.Conv2D(1, 1, activation='sigmoid')(conv5)
resized_outputs = ResizeLayer()(outputs, original_input)
model = models.Model(inputs=original_input, outputs=resized_outputs)
return model
#%%
# Assuming grayscale images
target_size = (224, 224)
model = unet_model(target_size)
#%%
# Combined Generalized Dice Loss and Focal Loss
def GDL_add_focal(gamma=2., alpha=0.25):
def focal_loss_fixed(y_true, y_pred):
pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred))
pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred))
return -K.mean(alpha * K.pow(1. - pt_1, gamma) * K.log(pt_1 + K.epsilon())) - K.mean(
(1 - alpha) * K.pow(pt_0, gamma) * K.log(1. - pt_0 + K.epsilon()))
def generalized_dice_coeff(y_true, y_pred):
# Calculate sum over the required number of dimensions
w = K.sum(y_true, axis=(0, 1, 2))
w = 1 / (w ** 2 + 0.000001)
numerator = y_true * y_pred
# Reduce sum to the number of dimensions in the 'numerator' tensor
numerator = w * K.sum(numerator, (0, 1, 2))
numerator = K.sum(numerator)
denominator = y_true + y_pred
# Same for denominator
denominator = w * K.sum(denominator, (0, 1, 2))
denominator = K.sum(denominator)
gen_dice_coef = 2 * numerator / denominator
return gen_dice_coef
def generalized_dice_loss(y_true, y_pred):
return 1 - generalized_dice_coeff(y_true, y_pred) + 10 * focal_loss_fixed(y_true, y_pred)
return generalized_dice_loss
#%%
def dice_coef_metric(y_true, y_pred):
intersection = 2.0 * K.sum(y_true * y_pred)
union = K.sum(y_true) + K.sum(y_pred)
return intersection / (union + K.epsilon())
#%%
# Compile the model with the specified optimizer, loss, and metric
model.compile(optimizer=nadam_optimizer, loss=GDL_add_focal(gamma=2., alpha=0.25), metrics=[dice_coef_metric])
model.summary()Editor is loading...
Leave a Comment