Untitled
unknown
plain_text
5 years ago
8.6 kB
7
Indexable
from matplotlib import pyplot as plt
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers as layers
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras import losses
import numpy as np
import os
from keras.layers import LeakyReLU
# tf.random.set_seed(100)
# np.random.seed(100)
latent_dim = 64
noise_sigma = 0.35
train_AE = False
TASK2 = False
TASK3 = True
TASK4 = False
TRAIN_GAN = True
TRAIN_CONDITIONAL_GAN = False
sml_train_size = 50
# load train and test images, and pad & reshape them to (-1,32,32,1)
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = np.reshape(x_train, (len(x_train), 28, 28, 1)).astype('float32') / 255.0
x_test = np.reshape(x_test, (len(x_test), 28, 28, 1)).astype('float32') / 255.0
x_train = np.pad(x_train, ((0, 0), (2, 2), (2, 2), (0, 0)))
x_test = np.pad(x_test, ((0, 0), (2, 2), (2, 2), (0, 0)))
print(x_train.shape)
print(x_test.shape)
y_train = keras.utils.to_categorical(y_train, num_classes=10, dtype='float32')
y_test = keras.utils.to_categorical(y_test, num_classes=10, dtype='float32')
encoder = Sequential()
encoder.add(layers.Conv2D(16, (4, 4), strides=(2, 2), activation='relu', padding='same', input_shape=(32, 32, 1)))
encoder.add(layers.Conv2D(32, (3, 3), strides=(2, 2), activation='relu', padding='same'))
encoder.add(layers.Conv2D(64, (3, 3), strides=(2, 2), activation='relu', padding='same'))
encoder.add(layers.Conv2D(96, (3, 3), strides=(2, 2), activation='relu', padding='same'))
encoder.add(layers.Reshape((2 * 2 * 96,)))
encoder.add(layers.Dense(latent_dim))
# at this point the representation is (4, 4, 8) i.e. 128-dimensional
decoder = Sequential()
decoder.add(layers.Dense(2 * 2 * 96, activation='relu', input_shape=(latent_dim,)))
decoder.add(layers.Reshape((2, 2, 96)))
decoder.add(layers.Conv2DTranspose(64, (3, 3), strides=(2, 2), activation='relu', padding='same'))
decoder.add(layers.Conv2DTranspose(32, (3, 3), strides=(2, 2), activation='relu', padding='same'))
decoder.add(layers.Conv2DTranspose(16, (4, 4), strides=(2, 2), activation='relu', padding='same'))
decoder.add(layers.Conv2DTranspose(1, (4, 4), strides=(2, 2), activation='sigmoid', padding='same'))
autoencoder = keras.Model(encoder.inputs, decoder(encoder.outputs))
autoencoder.compile(optimizer='adam', loss='binary_crossentropy')
checkpoint_path = "model_save/cp.ckpt"
if train_AE:
checkpoint_dir = os.path.dirname(checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
save_weights_only=True)
autoencoder.fit(x_train + noise_sigma * np.random.randn(*x_train.shape), x_train,
epochs=15,
batch_size=128,
shuffle=True,
validation_data=(x_test, x_test),
callbacks=[cp_callback])
else:
autoencoder.load_weights(checkpoint_path)
decoded_imgs = autoencoder.predict(x_test)
latent_codes = encoder.predict(x_test)
decoded_imgs = decoder.predict(latent_codes)
n = 10
plt.figure(figsize=(20, 4))
for i in range(1, n + 1):
# Display original
ax = plt.subplot(2, n, i)
plt.imshow(x_test[i].reshape(32, 32))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# Display reconstruction
ax = plt.subplot(2, n, i + n)
plt.imshow(decoded_imgs[i].reshape(32, 32))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
# Your code starts here:
if TASK2:
# Classifer Network - currently minimal
classifier = Sequential()
classifier.add(layers.Dense(128, activation='relu', input_shape=(latent_dim,)))
classifier.add(layers.Dense(128, activation='relu'))
classifier.add(layers.Dense(64, activation='relu'))
classifier.add(layers.Dense(10, activation='softmax'))
train_codes = encoder.predict(x_train[:sml_train_size])
test_codes = encoder.predict(x_test)
classifier.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
# es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, min_delta=0.001)
classifier.fit(train_codes, y_train[:sml_train_size],
epochs=200,
batch_size=16,
shuffle=True,
validation_data=(test_codes, y_test))
full_cls_enc = keras.models.clone_model(encoder)
full_cls_cls = keras.models.clone_model(classifier)
full_cls = keras.Model(full_cls_enc.inputs, full_cls_cls(full_cls_enc.outputs))
full_cls.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
full_cls.fit(x_train[:sml_train_size], y_train[:sml_train_size],
epochs=100,
batch_size=16,
shuffle=True,
validation_data=(x_test, y_test))
# Task 3 Functions:
def get_gen_model(noise_shape, latent_dim):
# gets gaussian sample
# outputs latent_space
generator = Sequential()
generator.add(layers.Dense(1024, activation=LeakyReLU(alpha=0.2), input_shape=(noise_shape,)))
generator.add(layers.Dense(1024, activation=LeakyReLU(alpha=0.2)))
generator.add(layers.Dense(512, activation=LeakyReLU(alpha=0.2)))
generator.add(layers.Dense(latent_dim))
return generator
def get_disc_model(latent_dim):
# gets gaussian sample
# outputs latent_space
discriminator = Sequential()
discriminator.add(layers.Dense(128, activation=LeakyReLU(alpha=0.2), input_shape=(latent_dim,)))
discriminator.add(layers.Dropout(0.4))
discriminator.add(layers.Dense(128, activation=LeakyReLU(alpha=0.2)))
discriminator.add(layers.Dropout(0.4))
discriminator.add(layers.Dense(1, activation="sigmoid"))
opt = tf.keras.optimizers.Adam(lr=0.0002, beta_1=0.5)
discriminator.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
discriminator.summary()
return discriminator
def build_gan(gen, disc):
disc.trainable = False
GAN = Sequential()
GAN.add(gen)
GAN.add(disc)
opt = tf.keras.optimizers.Adam(lr=0.0002, beta_1=0.5)
GAN.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
GAN.summary()
return GAN
def GAN_train(epoch_num, batch_size, noise_size):
batch_per_epoch = int(x_train.shape[0] / batch_size)
print(batch_per_epoch)
gen = get_gen_model(noise_size, latent_dim)
disc = get_disc_model(latent_dim)
GAN = build_gan(gen, disc)
for i in range(epoch_num):
for j in range(batch_per_epoch):
# sample real
real_samples = encoder.predict(tf.random.shuffle(x_train)[:int(batch_size / 2)])
# sample fake
random_samples = tf.random.normal(shape=[int(batch_size / 2), noise_size])
fake_samples = gen.predict(random_samples)
X = np.vstack([real_samples, fake_samples])
y = np.vstack([np.ones(shape=[int(batch_size / 2), 1]), np.zeros(shape=[int(batch_size / 2), 1])])
disc_loss, disc_acc = disc.train_on_batch(X, y)
random_sample = tf.random.normal(shape=[batch_size, noise_size])
labels = np.ones(batch_size)
gan_loss, __ = GAN.train_on_batch(random_sample, labels)
print("\nEPOCH {} gan_loss = {}, disc_loss = {}, disc_acc = {}".format(i + 1, gan_loss, disc_loss, disc_acc))
var1 = gen(tf.ones([1, noise_size]))
var2 = gen(tf.zeros([1, noise_size]))
var3 = gen(tf.random.uniform(shape=[1, noise_size]))
im = decoder.predict(var1)
im1 = decoder.predict(var2)
im2 = decoder.predict(var3)
fig1 = plt.figure()
plt.imshow(tf.squeeze(im, [0, 3]), cmap="gray")
plt.show()
return GAN
def GAN_interpolate(GAN):
"""
:return:
"""
# Task 3 Constants
noise_size = np.array(64)
batch_size = 256
epoch_num = 100
batch_per_epoch = 20
if TASK3:
GAN_path = 'model_save/gan_cp'
if TRAIN_GAN:
GAN_model = GAN_train(epoch_num, batch_size, noise_size)
GAN_model.save(GAN_path)
else:
# perform interpolation task
GAN_model = tf.keras.models.load_model(GAN_path)
AE_interpolation, latent_interpolation = GAN_interpolate()
Editor is loading...