Untitled

 avatar
unknown
plain_text
a month ago
3.3 kB
6
Indexable
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import make_blobs
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, roc_curve
import matplotlib.pyplot as plt

# Step 1: Generate synthetic data
def generate_data():
    X, y = make_blobs(n_samples=1000, centers=[[0, 0], [5, 5]], cluster_std=[1.0, 0.5], random_state=42)
    y = (y == 1).astype(int)  # Label anomalies as 1, normal as 0
    return train_test_split(X, y, test_size=0.2, random_state=42)

X_train, X_test, y_train, y_test = generate_data()

# Step 2: Define a simple neural network
class AnomalyScoringNetwork(nn.Module):
    def __init__(self, input_dim):
        super(AnomalyScoringNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 16),
            nn.ReLU(),
            nn.Linear(16, 1)
        )

    def forward(self, x):
        return self.fc(x)

# Step 3: Define the deviation loss function
def deviation_loss(anomaly_scores, reference_score, y, threshold=5.0):
    """
    anomaly_scores: Predicted anomaly scores
    reference_score: Mean of normal data scores (Gaussian prior)
    y: True labels (0 for normal, 1 for anomaly)
    threshold: Z-score threshold for deviation
    """
    deviation = (anomaly_scores - reference_score) / 1.0  # Assume sigma=1 for simplicity
    loss_normal = torch.mean((1 - y) * torch.abs(deviation))
    loss_anomaly = torch.mean(y * torch.clamp(threshold - deviation, min=0))
    return loss_normal + loss_anomaly

# Step 4: Training the model
def train_model(model, X_train, y_train, num_epochs=50, batch_size=32, learning_rate=0.01):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = deviation_loss

    dataset = torch.utils.data.TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train))
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for X_batch, y_batch in dataloader:
            optimizer.zero_grad()
            anomaly_scores = model(X_batch).squeeze()
            reference_score = torch.mean(anomaly_scores[y_batch == 0])
            loss = criterion(anomaly_scores, reference_score, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss:.4f}")

# Step 5: Evaluate the model
def evaluate_model(model, X_test, y_test):
    model.eval()
    with torch.no_grad():
        anomaly_scores = model(torch.FloatTensor(X_test)).squeeze().numpy()
    auc = roc_auc_score(y_test, anomaly_scores)
    print(f"AUC-ROC: {auc:.4f}")

    # Step 6: Plot the ROC curve
    fpr, tpr, _ = roc_curve(y_test, anomaly_scores)
    plt.figure()
    plt.plot(fpr, tpr, label=f"ROC curve (AUC = {auc:.4f})")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("ROC Curve")
    plt.legend(loc="best")
    plt.grid(True)
    plt.show()

# Run the training and evaluation
input_dim = X_train.shape[1]
model = AnomalyScoringNetwork(input_dim)
train_model(model, X_train, y_train)
evaluate_model(model, X_test, y_test)
Leave a Comment