Untitled
import numpy as np import torch import torch.nn as nn import torch.optim as optim from sklearn.datasets import make_blobs from sklearn.model_selection import train_test_split from sklearn.metrics import roc_auc_score, roc_curve import matplotlib.pyplot as plt # Step 1: Generate synthetic data def generate_data(): X, y = make_blobs(n_samples=1000, centers=[[0, 0], [5, 5]], cluster_std=[1.0, 0.5], random_state=42) y = (y == 1).astype(int) # Label anomalies as 1, normal as 0 return train_test_split(X, y, test_size=0.2, random_state=42) X_train, X_test, y_train, y_test = generate_data() # Step 2: Define a simple neural network class AnomalyScoringNetwork(nn.Module): def __init__(self, input_dim): super(AnomalyScoringNetwork, self).__init__() self.fc = nn.Sequential( nn.Linear(input_dim, 16), nn.ReLU(), nn.Linear(16, 1) ) def forward(self, x): return self.fc(x) # Step 3: Define the deviation loss function def deviation_loss(anomaly_scores, reference_score, y, threshold=5.0): """ anomaly_scores: Predicted anomaly scores reference_score: Mean of normal data scores (Gaussian prior) y: True labels (0 for normal, 1 for anomaly) threshold: Z-score threshold for deviation """ deviation = (anomaly_scores - reference_score) / 1.0 # Assume sigma=1 for simplicity loss_normal = torch.mean((1 - y) * torch.abs(deviation)) loss_anomaly = torch.mean(y * torch.clamp(threshold - deviation, min=0)) return loss_normal + loss_anomaly # Step 4: Training the model def train_model(model, X_train, y_train, num_epochs=50, batch_size=32, learning_rate=0.01): optimizer = optim.Adam(model.parameters(), lr=learning_rate) criterion = deviation_loss dataset = torch.utils.data.TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)) dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True) for epoch in range(num_epochs): model.train() total_loss = 0 for X_batch, y_batch in dataloader: optimizer.zero_grad() anomaly_scores = model(X_batch).squeeze() reference_score = torch.mean(anomaly_scores[y_batch == 0]) loss = criterion(anomaly_scores, reference_score, y_batch) loss.backward() optimizer.step() total_loss += loss.item() print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss:.4f}") # Step 5: Evaluate the model def evaluate_model(model, X_test, y_test): model.eval() with torch.no_grad(): anomaly_scores = model(torch.FloatTensor(X_test)).squeeze().numpy() auc = roc_auc_score(y_test, anomaly_scores) print(f"AUC-ROC: {auc:.4f}") # Step 6: Plot the ROC curve fpr, tpr, _ = roc_curve(y_test, anomaly_scores) plt.figure() plt.plot(fpr, tpr, label=f"ROC curve (AUC = {auc:.4f})") plt.xlabel("False Positive Rate") plt.ylabel("True Positive Rate") plt.title("ROC Curve") plt.legend(loc="best") plt.grid(True) plt.show() # Run the training and evaluation input_dim = X_train.shape[1] model = AnomalyScoringNetwork(input_dim) train_model(model, X_train, y_train) evaluate_model(model, X_test, y_test)
Leave a Comment