Untitled
unknown
python
a year ago
9.6 kB
10
Indexable
# %%
import tensorflow as tf
tf.test.is_gpu_available()
tf.config.list_physical_devices('GPU')
#tf.test.is_built_with_cuda()
# %%
import argparse
import os
import json
import cv2
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from imutils import paths
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import balanced_accuracy_score, accuracy_score,\
f1_score, recall_score, precision_score, roc_auc_score
from sklearn.preprocessing import LabelBinarizer
from tensorflow.keras.callbacks import (
EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
#Add /home/dslab_lzh/work_data/Mini-COVIDNet/MOBILENetV2 to sys.path
import sys
sys.path.append('/home/dslab_lzh/work_data/Mini-COVIDNet/MOBILENetV2')
from model_mobile import get_model
from utils import Metrics
import pandas as pd
from PIL import Image
os.environ["CUDA_VISIBLE_DEVICES"]="1"
# %%
def load_images(root_dir, data_info_file, use="train"):
data_frame = pd.read_csv(data_info_file)
if use == "train":
data_frame = data_frame[data_frame["Use"].isin(["Train"])]
elif use == "val":
data_frame = data_frame[data_frame["Use"].isin(["Val"])]
elif use == "test":
data_frame = data_frame[data_frame["Use"].isin(["Test"])]
else:
raise ValueError("Use must be one of train, val or test")
root_dir = root_dir
image_labels = []
for _, row in data_frame.iterrows():
patient_id = row["ID"]
# Convert label string to 1/0
label = 1 if row["Label"] == "COVID-19" else 0
img_folder = os.path.join(root_dir, f"{str(patient_id)}/frames")
# Walk through the folder and get all images
for img_file in os.listdir(img_folder):
if img_file.lower().endswith((".png", ".jpg", ".jpeg")):
image = Image.open(os.path.join(img_folder, img_file)).convert("RGB")
#resize image
image = image.resize((224, 224))
image = np.array(image)
image_labels.append(
(image, label, patient_id)
)
return image_labels
def get_item(idx, image_labels):
img_path, label, patient_id = image_labels[idx]
image = Image.open(img_path).convert("RGB")
return np.array(image), label, patient_id
# %%
train_dataset = load_images(root_dir="/home/dslab_lzh/data/LUS/covid19_LUS/frames", data_info_file="/home/dslab_lzh/work_data/EID_Comparison/DataLimit/temporary_data_supervised.csv", use="train")
test_dataset = load_images(root_dir="/home/dslab_lzh/data/LUS/covid19_LUS/frames", data_info_file="/home/dslab_lzh/work_data/EID_Comparison/DataLimit/temporary_data_supervised.csv", use="test")
print(len(test_dataset))
#%%
folds = 1
exp_results = []
for fold in range(folds):
MODEL_DIR = '/home/dslab_lzh/work_data/Mini-COVIDNet/covid19_ultrasound/pocovidnet/models'
FOLD = fold
LR = 1e-4
EPOCHS = 50
BATCH_SIZE = 16
TRAINABLE_BASE_LAYERS = 1
IMG_WIDTH, IMG_HEIGHT = (224,224)
model_name = "ireallywanttogiveup.h5"
train_data = [x[0] for x in train_dataset]
train_labels = [x[1] for x in train_dataset]
test_data = [x[0] for x in test_dataset]
test_labels = [x[1] for x in test_dataset]
print(train_labels[0])
# intensities to the range [0, 255]
train_data = np.array(train_data) / 255.0
test_data = np.array(test_data) / 255.0
train_labels_text = np.array(train_labels)
test_labels_text = np.array(test_labels)
# num_classes = len(set(train_labels))
# print(num_classes)
# perform one-hot encoding on the labels
# lb = LabelBinarizer()
# lb.fit(train_labels_text)
# train_labels = lb.transform(train_labels_text)
# test_labels = lb.transform(test_labels_text)
# if num_classes == 2:
# train_labels = to_categorical(train_labels, num_classes=num_classes)
# test_labels = to_categorical(test_labels, num_classes=num_classes)
trainX = train_data
trainY = train_labels
testX = test_data
testY = test_labels
# print('Class mappings are:', lb.classes_)
print(f'trainX shape: {trainX[0].shape}')
print(f'trainY shape: {trainY[0].shape}')
print(f'testX shape: {testX[0]}')
print(f'testY shape: {testY[0]}')
# initialize the training data augmentation object
trainAug = ImageDataGenerator(
rotation_range=10,
fill_mode='nearest',
# horizontal_flip=True,
# vertical_flip=True,
width_shift_range=0.1,
height_shift_range=0.1
)
# Load the VGG16 network
model = get_model(
input_size=(IMG_WIDTH, IMG_HEIGHT, 3), num_classes=2
)
num_layers = len(model.layers)
# Define callbacks
earlyStopping = EarlyStopping(
monitor='val_loss',
patience=20,
verbose=1,
mode='min',
restore_best_weights=True
)
mcp_save = ModelCheckpoint(
os.path.join(MODEL_DIR, 'fold_' + str(FOLD) + '_epoch_{epoch:02d}'),
save_best_only=True,
monitor='val_accuracy',
mode='max',
verbose=1
)
reduce_lr_loss = ReduceLROnPlateau(
monitor='val_loss',
factor=0.1,
patience=7,
verbose=1,
epsilon=1e-4,
mode='min'
)
# To show balanced accuracy
metrics = Metrics((testX, testY), model)
# compile model
print('Compiling model...')
opt = Adam(lr=LR, decay=LR / EPOCHS)
model.compile(
loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy']
)
print('Model has {model.count_params()} parameters')
print('Model summary {model.summary()}')
model.summary()
# train the head of the network
print('Starting training model...')
H = model.fit_generator(
trainAug.flow(trainX, trainY, batch_size=BATCH_SIZE),
steps_per_epoch=len(trainX) // BATCH_SIZE,
validation_data=(testX, testY),
validation_steps=len(testX) // BATCH_SIZE,
epochs=EPOCHS,
callbacks=[earlyStopping, mcp_save, reduce_lr_loss, metrics]
)
# make predictions on the testing set
print('Evaluating network...')
predIdxs = model.predict(testX, batch_size=BATCH_SIZE)
# for each image in the testing set we need to find the index of the
# label with corresponding largest predicted probability
predIdxs = np.argmax(predIdxs, axis=1)
print('classification report sklearn:')
print(
classification_report(
testY.argmax(axis=1), predIdxs, target_names=["0", "1"]
)
)
print('classification report sklearn:')
print(
classification_report(
testY.argmax(axis=1), predIdxs, target_names=["0", "1"]
)
)
print('confusion matrix:')
cm = confusion_matrix(testY.argmax(axis=1), predIdxs)
# show the confusion matrix, accuracy, sensitivity, and specificity
print(cm)
# serialize the model to disk
print('Saving COVID-19 detector model on {model_name} data...')
model.save(os.path.join(MODEL_DIR, model_name), save_format='h5')
y_true = testY.argmax(axis=1)
y_preds = predIdxs
###### calculate matrics ######
accuracy = accuracy_score(y_true, y_preds)
recall = recall_score(y_true, y_preds)
precision = precision_score(y_true, y_preds)
f1_score_result = f1_score(y_true, y_preds)
specificity = recall_score(y_true, y_preds, pos_label=0)
weighted_accuracy = balanced_accuracy_score(y_true, y_preds)
auc = roc_auc_score(y_true, y_preds)
##### End calculate matrics #####
#Print the results
print(f"Accuracy: {accuracy}")
print('Weighted accuracy: ', weighted_accuracy)
print(f"Recall: {recall}")
print(f"Precision: {precision}")
print(f"Specificity: {specificity}")
print(f"F1 Score: {f1_score_result}")
print(f"AUC: {auc}")
metrics = {
"accuracy": accuracy,
"recall": recall,
"precision": precision,
"specificity": specificity,
"f1_score": f1_score_result,
"weighted_accuracy": weighted_accuracy,
"auc": auc
}
exp_results.append(metrics)
#wirte the result to the json file
with open("/home/dslab_lzh/work_data/EDI_Results/mini_covidnet_baseline.json", "w", encoding="utf-8") as f:
f.write(json.dumps(exp_results))
f.close()
# plot the training loss and accuracy
# N = EPOCHS
# plt.style.use('ggplot')
# plt.figure()
# plt.plot(np.arange(0, N), H.history['loss'], label='train_loss')
# plt.plot(np.arange(0, N), H.history['val_loss'], label='val_loss')
# plt.plot(np.arange(0, N), H.history['accuracy'], label='train_acc')
# plt.plot(np.arange(0, N), H.history['val_accuracy'], label='val_acc')
# plt.title('Training Loss and Accuracy on COVID-19 Dataset')
# plt.xlabel('Epoch #')
# plt.ylabel('Loss/Accuracy')
# plt.legend(loc='lower left')
# plt.savefig(os.path.join(MODEL_DIR, plot_path))
print('Done, shuttting down!')
# %%
Editor is loading...
Leave a Comment