Untitled
unknown
plain_text
3 years ago
24 kB
2
Indexable
""" Created At: 14/07/2021 15:39 """ import random import time import warnings from tensorflow.python.keras.callbacks import ModelCheckpoint import src.utility.utils from attacker.SuccessRateComputationCallback import SuccessRateComputationCallback from attacker.autoencoder import MnistAutoEncoder from utility.autoencoder_config import AutoencoderConfig from attacker.constants import * from attacker.attacker import Attacker from attacker.losses import AE_LOSSES from data_preprocessing.mnist import MnistPreprocessing from utility.config import attack_config from utility.constants import * from utility.optimize_advs import optimize_advs from utility.statistics import * from utility.utils import * warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning) tf.config.experimental_run_functions_eagerly(True) logger = MyLogger.getLog() class HPBA(Attacker): def __init__(self, origin_label, trainX, trainY, target_classifier, substitute_classifier, weight, target_label=None, # target_position=2, step_to_recover=12., num_images_to_attack=1000, pattern=ALL_PATTERN, num_class=MNIST_NUM_CLASSES, num_images_to_train=1000, use_optimize_phase=True, attack_type=ATTACK_WHITEBOX_TYPE, substitute_classifier_name=None, attack_stop_condition=None, autoencoder_config: AutoencoderConfig = None, quality_loss_str=LOSS_MSE): super().__init__(trainX=trainX, trainY=trainY, target_classifier=target_classifier, substitute_classifier=substitute_classifier, substitute_classifier_name=substitute_classifier_name, target_label=target_label, # target_position=target_position, num_class=num_class, method_name=HPBA_METHOD_NAME, attack_type=attack_type, origin_label=origin_label, attack_stop_condition=attack_stop_condition, quality_loss_str=quality_loss_str) self.weight = weight self.step_to_recover = step_to_recover self.num_images_to_attack = num_images_to_attack if len(self.origin_images) > num_images_to_train else len( self.origin_images) # self.max_number_advs_to_optimize = max_number_advs_to_optimize self.pattern = pattern self.num_images_to_train = num_images_to_train self.autoencoder_config = autoencoder_config self.is_data_inside_0_1_or_0_255 = check_inside_range(self.origin_images, checked_range=(0, 1)) or check_inside_range( self.origin_images, checked_range=(0, 255)) if self.attack_type == ATTACK_WHITEBOX_TYPE: self.file_shared_name = '{method_name}_{classifier_name}_ori={origin_label}_tar={target_label}_weight={weight}_num={num_images}_loss={quality_loss}'.format( method_name=self.method_name, classifier_name=self.classifier_name, origin_label=self.origin_label, target_label=-1 if self.is_untargeted() else self.target_label, weight=str(self.weight).replace('.', ','), num_images=self.num_images_to_train, quality_loss=self.quality_loss_str) else: self.file_shared_name = '{method_name}_{classifier_name}_{sub_model_name}_ori={origin_label}_tar={target_label}_weight={weight}_num={num_images}loss={quality_loss}'.format( method_name=self.method_name, classifier_name=self.classifier_name, origin_label=self.origin_label, target_label=-1 if self.is_untargeted() else self.target_label, weight=str(self.weight).replace('.', ','), num_images=self.num_images_to_train, sub_model_name=self.substitute_cls_name, quality_loss=self.quality_loss_str) self.autoencoder_folder = os.path.join(self.general_result_folder, TEXT_AUTOENCODER) mkdirs([self.autoencoder_folder]) mkdirs([self.general_result_folder, self.result_summary_folder, self.data_folder, self.image_folder]) # self.images_folder = os.path.join(self.general_result_folder, TEXT_IMAGE) self.autoencoder = None self.autoencoder_file_path = os.path.join(self.autoencoder_folder, self.file_shared_name + '_' + TEXT_AUTOENCODER + '.h5') self.optimal_epoch = None self.smooth_adv_speed = None self.optimized_adv = None self.optimized_adv_0_255 = None self.optimized_adv_path = os.path.join(self.data_folder, self.short_file_shared_name + '_optimized_adv_' + self.shared_time_stamp + '.npy') self.L0_befores = None self.L0_afters = None self.L2_befores = None self.L2_afters = None self.SSIM_befores = None self.SSIM_afters = None self.use_optimize_phase = use_optimize_phase def attack(self): input_range = get_range_of_input(self.origin_images) ae_trainee = MnistAutoEncoder(input_range=input_range) white_box_classifier = self.classifier if self.substitute_classifier is None else self.substitute_classifier if check_path_exists(self.autoencoder_file_path) and self.autoencoder_config.autoencoder_model is None: logger.debug(self.shared_log + f'found pre-trained autoencoder for: origin_label = {self.origin_label}, target_label = {self.target_label}') self.autoencoder = tf.keras.models.load_model(self.autoencoder_file_path, compile=False, custom_objects={ 'range_custom_activation': wrap_range_custom_activation( min_value=input_range[0], max_value=input_range[1]) }) else: logger.debug(self.shared_log + f'not found pre-trained autoencoder for: origin_label = {self.origin_label}, target_label = {self.target_label}') logger.debug( self.shared_log + f'training autoencoder for: origin_label={self.origin_label}, target_label={self.target_label}') if self.autoencoder_config.autoencoder_model is not None: self.autoencoder = self.autoencoder_config.autoencoder_model else: if self.trainX[0].shape[-1] == 3: self.autoencoder = ae_trainee.get_3d_atchitecture(input_shape=self.trainX[0].shape) else: self.autoencoder = ae_trainee.apdative_architecture(input_shape=self.trainX[0].shape) adam = tf.keras.optimizers.Adam(learning_rate=self.autoencoder_config.learning_rate, beta_1=0.9, beta_2=0.999, amsgrad=False) au_loss = AE_LOSSES.general_loss(classifier=white_box_classifier, target_vector=self.target_vector, beta=self.weight, input_shape=self.trainX[0].shape, is_untargeted=self.is_untargeted(), quality_loss_str=self.quality_loss_str, num_class=self.num_class) self.autoencoder.compile(optimizer=adam, loss=au_loss) # add callbacks and train autoencoder # early_stopping = EarlyStopping(monitor='loss', verbose=0, mode='min', min_delta=0.001, patience=20) model_checkpoint = ModelCheckpoint(self.autoencoder_file_path, save_best_only=True, monitor='loss', mode='min') sr_computation = SuccessRateComputationCallback(self.origin_images[:self.num_images_to_train], self.target_label, self.classifier, self.attack_stop_condition, self.is_untargeted(), self.origin_label, print_every_epoch=self.autoencoder_config.print_result_every_epochs, num_class=self.num_class) history = self.autoencoder.fit(self.origin_images[:self.num_images_to_train], self.origin_images[:self.num_images_to_train], epochs=self.autoencoder_config.epochs, batch_size=self.autoencoder_config.batch_size, # callbacks=[model_checkpoint, sr_computation, early_stopping], callbacks=[model_checkpoint, sr_computation], verbose=1) self.autoencoder.save(self.autoencoder_file_path) logger.debug(self.shared_log + 'training autoencoder DONE!') self.optimal_epoch = len(history.history['loss']) logger.debug(self.shared_log + 'filtering generated candidates!') generated_candidates = self.autoencoder.predict(self.origin_images[:self.num_images_to_attack]) ranking_strategy = COI_RANKING optimized_classifier = self.classifier if self.substitute_classifier is not None: ranking_strategy = SEQUENTIAL_RANKING self.adv_result, _, self.origin_adv_result, result_origin_labels_vector = filter_candidate_adv( origin_data=self.origin_images[:self.num_images_to_attack], candidate_adv=generated_candidates, target_label=self.target_label, cnn_model=optimized_classifier, is_untargeted=self.is_untargeted(), origin_label=self.origin_label, num_class=self.num_class) logger.debug(self.shared_log + 'filtering generated candidates DONE!') if self.adv_result is None or len(self.adv_result) == 0: self.end_time = time.time() logger.error("Cannot generate adv") return self.L0_befores, self.L2_befores, self.SSIM_befores = compute_distance(self.adv_result, self.origin_adv_result) # if found adv, continue optimization phase if self.use_optimize_phase is True: logger.debug(self.shared_log + 'optimizing generated advs') actual_recover_step = round(self.step_to_recover * np.average(self.L0_befores)) self.optimized_adv_0_255 = optimize_advs(classifier=optimized_classifier, generated_advs=self.adv_result, origin_images=self.origin_adv_result, target_label=self.target_label, # is a number origin_label=self.origin_label, # is a number origin_labels=result_origin_labels_vector, # one-hot vector step=actual_recover_step, num_class=self.num_class, ranking_type=ranking_strategy, batch_size=attack_config.batch_to_optimize, epoch_to_optimize=attack_config.epoch_to_optimize, is_untargeted=self.is_untargeted() ) # self.optimized_adv = self.optimized_adv_0_255 / 255. self.optimized_adv = np.array(self.optimized_adv_0_255) np.save(self.optimized_adv_path, self.optimized_adv) self.optimized_adv = np.asarray(self.optimized_adv).reshape(self.adv_result.shape) self.L0_afters, self.L2_afters, self.SSIM_afters = compute_distance(self.optimized_adv, self.origin_adv_result) logger.debug(self.shared_log + 'optimizing generated advs DONE') else: self.optimized_adv = None self.optimized_adv_0_255 = None self.end_time = time.time() np.save(self.adv_result_path, self.adv_result) np.save(self.origin_adv_result_path, self.origin_adv_result) def export_result(self, end_text=''): origin_folder = '/origin_' + 'originClass=' + str(self.origin_label) + '_predictedClass=' + str( self.origin_label) + '_timestamp=' + self.shared_time_stamp optimized_adv_folder = '/optimized_adv_' + 'originClass=' + str(self.origin_label) + '_predictedClass=' + str( self.target_label) + '_timestamp=' + self.shared_time_stamp non_optimized_adv_folder = '/non_optimized_adv_' + 'originClass=' + str( self.origin_label) + '_predictedClass=' + str( self.target_label) + '_timestamp=' + self.shared_time_stamp if self.adv_result is None: self.attack() logger.debug(self.shared_log + 'exporting results') result = '<=======' + '\n' result += 'Configuration:\n' result += 'Attack type: ' result += ATTACK_BLACKBOX_TYPE if self.substitute_classifier is not None else ATTACK_WHITEBOX_TYPE result += '\n' result += '\tClassifier name: ' + str(self.classifier_name) + '\n' result += '\tOriginal label: ' + str(self.origin_label) + '\n' if self.is_untargeted(): result += '\tTarget label: None (untargeted attack)' + '\n' else: result += '\tTarget label: ' + str(self.target_label) + '\n' result += '\tRecover speed: ' + str(self.step_to_recover) + '\n' result += '\tWeight: ' + str(self.weight) + '\n' result += '\tNumber data to train autoencoder: ' + str(self.num_images_to_train) + '\n' if self.num_images_to_attack == -1: result += '\tNumber data to attack: ALL \n' else: result += '\tNumber data to attack: ' + str(self.num_images_to_attack) + '\n' result += '\tUse optimize phase: ' + str(self.use_optimize_phase) result += '\tQuality loss: ' + str(self.quality_loss_str) result += '\n' result += 'Attack result:\n' if self.adv_result is not None and self.adv_result.shape[0] != 0: if self.is_data_inside_0_1_or_0_255: save_images_to_folder(folder_path=self.image_folder + non_optimized_adv_folder, images=self.adv_result, prefix_file_name='non_optimized_adv', logger=logger) result += f'\tSuccess rate: {(self.adv_result.shape[0] / self.num_images_to_attack) * 100.: .2f}%\n' result += '\tL0 distance non-optimized (min/max/avg): ' + '{min_l0}/ {max_l0}/ {avg_l0}'.format( min_l0=np.min(self.L0_befores), max_l0=np.max(self.L0_befores), avg_l0=round( np.average(np.round(self.L0_befores, 2)), 2)) result += '\n' if self.use_optimize_phase is True: result += '\tL0 distance optimized (min/max/avg): ' + '{min_l0}/ {max_l0}/ {avg_l0}'.format( min_l0=np.min(self.L0_afters), max_l0=np.max(self.L0_afters), avg_l0=round( np.average(self.L0_afters), 2)) result += '\n' result += '\tL2 distance non-optimized (min/max/avg): ' + \ f'{np.round(np.min(self.L2_befores), 2):.2f}/ {round(np.max(self.L2_befores), 2):.2f}/ {round(np.average(self.L2_befores), 2):.2f}' result += '\n' if self.use_optimize_phase is True: result += '\tL2 distance optimized (min/max/avg): ' + f'{np.min(self.L2_afters):.2f}/ {round(np.max(self.L2_afters), 2):.2f}/ {round(np.average(self.L2_afters), 2):.2f}' result += '\n' result += '\tSSIM distance non-optimized (min/max/avg): ' + \ f'{np.round(np.min(self.SSIM_befores), 2):.2f}/ {round(np.max(self.SSIM_befores), 2):.2f}/ {round(np.average(self.SSIM_befores), 2):.2f}' result += '\n' if self.use_optimize_phase is True: result += '\tSSIM distance optimized (min/max/avg): ' + f'{np.min(self.SSIM_afters):.2f}/ {round(np.max(self.SSIM_afters), 2):.2f}/ {round(np.average(self.SSIM_afters), 2):.2f}' result += '\n' if self.use_optimize_phase is True: if self.is_data_inside_0_1_or_0_255: save_images_to_folder(folder_path=self.image_folder + optimized_adv_folder, images=self.optimized_adv_0_255, prefix_file_name='optimized_adv', logger=logger) result += f'\tExecution time: {np.round(self.end_time - self.start_time, 2):.2f} seconds' result += '\n' result += '\tAutoencoder path: ' + str(self.autoencoder_file_path) + '\n' result += '\tAdv path: ' + str(self.adv_result_path) + '\n' result += '\tOptimized adv path: ' + str(self.optimized_adv_path) + '\n' result += '\tOriginal data path: ' + str(self.origin_adv_result_path) + '\n' save_images_to_folder(folder_path=self.image_folder + origin_folder, images=self.origin_adv_result, prefix_file_name='origin', logger=logger) else: result += '\tSuccess rate: 0%' + '\n' result += f'\tExecution time: {np.round(self.end_time - self.start_time, 2):.2f} seconds' result += '\n' result += '=======>\n' result += end_text write_to_file(content=result, path=self.summary_path) logger.debug(self.shared_log + 'exporting results DONE!') logger.info(f'Please open file: {self.summary_path} to view results!') def plot_some_random_images_v2(self, max_images=10): # each figure contain a pair of clean and adv tmp_optimized = self.optimized_adv if self.use_optimize_phase is True else self.adv_result for idx in range(0, max_images): if idx >= len(self.adv_result): break clean_label = np.argmax(self.classifier.predict(self.origin_adv_result[idx][np.newaxis, ...]), axis=1) non_optimized_adv_label = np.argmax(self.classifier.predict(self.adv_result[idx][np.newaxis, ...]), axis=1) if self.use_optimize_phase: optimized_adv_label = np.argmax(self.classifier.predict(tmp_optimized[idx][np.newaxis, ...]), axis=1) utils.show_three_images_3D(x_28_28_left=self.origin_adv_result[idx], x_28_28_mid=self.adv_result[idx], x_28_28_right=tmp_optimized[idx], left_title=f'clean\n(label = {clean_label})', mid_title=f'non-optimized adv\n(label = {non_optimized_adv_label})', right_title=f'optimized adv\n(label = {optimized_adv_label})', display=False, path=self.image_path.replace(".png", "") + str(idx) + ".png") else: utils.show_two_images_3D(x_28_28_left=self.origin_adv_result[idx], x_28_28_right=self.adv_result[idx], left_title=f'clean\n(label = {clean_label})', right_title=f'non-optimized adv\n(label = {non_optimized_adv_label})', display=False, path=self.image_path.replace(".png", "") + str(idx) + ".png") def plot_some_random_images(self): # multiple images in a figure if self.adv_result is None or len(self.adv_result) == 0: return if not self.is_data_inside_0_1_or_0_255: return fig = plt.figure(figsize=(8, 8)) columns = 3 rows = 3 images = [] tmp_optimized = self.optimized_adv if self.use_optimize_phase is True else self.adv_result ramdom_indexs = [random.randint(0, tmp_optimized.shape[0] - 1) for i in range(rows)] for i in ramdom_indexs: images.append(self.origin_adv_result[i]) images.append(self.adv_result[i]) images.append(tmp_optimized[i]) for i in range(1, columns * rows + 1): img = images[i - 1] ax = plt.subplot(rows, columns, i) if i % rows == 1: title = 'origin' elif i % rows == 2: title = 'adv' else: title = 'optimized adv' if self.use_optimize_phase is False: title += '(not use)' img = np.ones_like(img) if img.shape[-1] == 1: plt.imshow(img.reshape(img.shape[:-1]), cmap='gray') else: plt.imshow(img, cmap='gray') ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) if i < 4: ax.set_title(title) fig.savefig(self.image_path) plt.close(fig) if __name__ == '__main__': logger.debug('pre-processing data') (trainX, trainY), (testX, testY) = tf.keras.datasets.mnist.load_data() trainX, trainY = MnistPreprocessing.quick_preprocess_data(trainX, trainY, num_classes=MNIST_NUM_CLASSES, rows=MNIST_IMG_ROWS, cols=MNIST_IMG_COLS, chl=MNIST_IMG_CHL) testX, testY = MnistPreprocessing.quick_preprocess_data(testX, testY, num_classes=MNIST_NUM_CLASSES, rows=MNIST_IMG_ROWS, cols=MNIST_IMG_COLS, chl=MNIST_IMG_CHL) np.save('../../data/mnist/mnist_training.npy', trainX) np.save('../../data/mnist/mnist_label.npy', trainY)
Editor is loading...