cnn_classifier.py

mail@pastecode.io avatar
unknown
python
14 days ago
7.2 kB
4
Indexable
Never
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
from pathlib import Path
import os
import datetime
import time
from src import helper
from src.configurations import DefaultConfigurations
from src.resources import REPO_DIR, Language



class CWI_CNN(keras.Model):
    def __init__(
            self,
            sequence_length,
            num_classes,
            embedding_dims,
            filter_sizes,
            num_filters,
            l2_reg_lambda=0.0,
            lang=Language.French,
    ):
        super(CWI_CNN, self).__init__()
        self.lang = lang
        print(num_classes)
        print("Loading model...")

        self.input_layer = layers.Input(shape=(sequence_length, embedding_dims), name="input_x")
        self.dropout = layers.Dropout(0.5)

        # Create a convolution + maxpool layer for each filter size
        pooled_outputs = []
        for filter_size in filter_sizes:
            conv = layers.Conv2D(
                filters=num_filters,
                kernel_size=(filter_size, embedding_dims),
                strides=(1, 1),
                padding="valid",
                activation="relu",
                kernel_initializer="random_normal",
                bias_initializer="random_normal",
                name=f"conv-{filter_size}"
            )
            pool = layers.MaxPool2D(
                pool_size=(sequence_length - filter_size + 1, 1),
                strides=(1, 1),
                padding="valid",
                name=f"pool-{filter_size}"
            )
            conv_pool = lambda x: pool(conv(x))
            pooled_outputs.append(conv_pool)

        self.pooled_layers = pooled_outputs

        # Combine all the pooled features
        self.concat = layers.Concatenate(axis=3)
        self.flatten = layers.Flatten()

        # Dense layers
        self.dense1 = layers.Dense(256, activation="relu", kernel_initializer="glorot_uniform",
                                   bias_initializer="random_normal")
        self.dense2 = layers.Dense(64, activation="relu", kernel_initializer="glorot_uniform",
                                   bias_initializer="random_normal")
        self.output_layer = layers.Dense(num_classes, activation="softmax", kernel_initializer="glorot_uniform",
                                         bias_initializer="random_normal")

    def call(self, inputs, training=False):
        x = tf.expand_dims(inputs, -1)
        pooled_outputs = [layer(x) for layer in self.pooled_layers]
        x = self.concat(pooled_outputs)
        x = self.flatten(x)
        x = self.dropout(x, training=training)
        x = self.dense1(x)
        x = self.dropout(x, training=training)
        x = self.dense2(x)
        x = self.dropout(x, training=training)
        return self.output_layer(x)


@helper.print_execution_time
def run_training(out_dir, x_train, y_train, x_valid, y_valid, feature_args, lang, configs):
    print("x_train.shape:", x_train.shape)
    print("y_train.shape:", y_train.shape)

    helper.write_lines(feature_args, Path(out_dir) / 'features.txt')
    print("Writing to {}\n".format(out_dir))

    print("Generating model and starting training...")

    model = CWI_CNN(
        sequence_length=x_train.shape[1],
        num_classes=y_train.shape[1],
        embedding_dims=x_train.shape[2],
        filter_sizes=list(map(int, configs.FILTER_SIZES.split(","))),
        num_filters=configs.NUM_FILTERS,
        l2_reg_lambda=configs.L2_REG_LAMBDA,
        lang=lang
    )

    optimizer = keras.optimizers.Adam(learning_rate=configs.LEARNING_RATE)
    model.compile(optimizer=optimizer,
                  loss=keras.losses.CategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    # Callbacks
    checkpoint_dir = os.path.abspath(os.path.join(out_dir, "checkpoints"))
    checkpoint_prefix = os.path.join(checkpoint_dir, "model")
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)

    cp_callback = keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_prefix,
        save_weights_only=True,
        save_best_only=True,
        monitor='val_accuracy',
        mode='max'
    )

    early_stopping = keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=configs.EARLY_STOPPING_PATIENCE,
        restore_best_weights=True
    )

    # TensorBoard callback
    log_dir = os.path.join(out_dir, "logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
    tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

    # Train the model
    history = model.fit(
        x_train, y_train,
        batch_size=configs.BATCH_SIZE,
        epochs=configs.NUM_EPOCHS,
        validation_data=(x_valid, y_valid),
        callbacks=[cp_callback, early_stopping, tensorboard_callback]
    )

    # Save the entire model
    model.save(os.path.join(out_dir, "saved_model"))

    # Save validation scores
    val_scores = history.history['val_accuracy']
    with open(os.path.join(out_dir, "val_scores.txt"), "w") as scores_file:
        scores_file.write("\n".join(map(str, val_scores)))

    return out_dir.stem


def train(x_train, y_train, x_valid, y_valid, feature_args, lang=None, configs=DefaultConfigurations()):
    timestamp = str(int(time.time()))
    out_dir = REPO_DIR / f'models/{lang}/CNN/{timestamp}'
    out_dir.mkdir(parents=True, exist_ok=True)

    with helper.log_stdout(out_dir / 'logs.txt'):
        return run_training(out_dir, x_train, y_train, x_valid, y_valid, feature_args, lang, configs)


def evaluate(x_test, y_test, x_test_sents, model_dir=None, features=None, output_dir=None, test_name=None, lang=None):
    p = Path(REPO_DIR / f'models/{lang}/CNN')
    dirs = sorted(p.iterdir(), key=lambda f: f.stat().st_mtime)

    if len(dirs) > 0:
        if model_dir is not None:
            checkpoint_dir = REPO_DIR / f'models/{lang}/CNN/{model_dir}'
        else:
            checkpoint_dir = Path(str(dirs[-1]))  # load the last checkpoint

        print(f"Checkpoint dir: {checkpoint_dir}")

        # Load the saved model
        model = keras.models.load_model(str(checkpoint_dir / "saved_model"))

        # Make predictions
        predictions = model.predict(x_test)
        all_predictions = np.argmax(predictions, axis=1)

        if output_dir is None:
            output_dir = checkpoint_dir

        Path(output_dir).mkdir(parents=True, exist_ok=True)
        model_name = checkpoint_dir.parent.stem + '_' + checkpoint_dir.stem

        y_test = np.argmax(y_test, axis=1)

        helper.save_evaluation_report(all_predictions, y_test, x_test_sents, output_dir, model_name, test_name,
                                      features)
        return all_predictions

    else:
        print("You haven't trained a model yet.")
        print("Run training script to train a model, e.g.,: python scripts/train_all.py")


# The save_probabilities function remains unchanged
def save_probabilities(predictions, y_test, x_test_sents, output_dir, model_name, test_name, features):
    helper.save_evaluation_report(predictions, y_test, x_test_sents, output_dir, model_name, test_name, features)
Leave a Comment