Untitled
unknown
plain_text
10 months ago
3.3 kB
9
Indexable
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import make_blobs
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, roc_curve
import matplotlib.pyplot as plt
# Step 1: Generate synthetic data
def generate_data():
X, y = make_blobs(n_samples=1000, centers=[[0, 0], [5, 5]], cluster_std=[1.0, 0.5], random_state=42)
y = (y == 1).astype(int) # Label anomalies as 1, normal as 0
return train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_test, y_train, y_test = generate_data()
# Step 2: Define a simple neural network
class AnomalyScoringNetwork(nn.Module):
def __init__(self, input_dim):
super(AnomalyScoringNetwork, self).__init__()
self.fc = nn.Sequential(
nn.Linear(input_dim, 16),
nn.ReLU(),
nn.Linear(16, 1)
)
def forward(self, x):
return self.fc(x)
# Step 3: Define the deviation loss function
def deviation_loss(anomaly_scores, reference_score, y, threshold=5.0):
"""
anomaly_scores: Predicted anomaly scores
reference_score: Mean of normal data scores (Gaussian prior)
y: True labels (0 for normal, 1 for anomaly)
threshold: Z-score threshold for deviation
"""
deviation = (anomaly_scores - reference_score) / 1.0 # Assume sigma=1 for simplicity
loss_normal = torch.mean((1 - y) * torch.abs(deviation))
loss_anomaly = torch.mean(y * torch.clamp(threshold - deviation, min=0))
return loss_normal + loss_anomaly
# Step 4: Training the model
def train_model(model, X_train, y_train, num_epochs=50, batch_size=32, learning_rate=0.01):
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = deviation_loss
dataset = torch.utils.data.TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train))
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
for epoch in range(num_epochs):
model.train()
total_loss = 0
for X_batch, y_batch in dataloader:
optimizer.zero_grad()
anomaly_scores = model(X_batch).squeeze()
reference_score = torch.mean(anomaly_scores[y_batch == 0])
loss = criterion(anomaly_scores, reference_score, y_batch)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss:.4f}")
# Step 5: Evaluate the model
def evaluate_model(model, X_test, y_test):
model.eval()
with torch.no_grad():
anomaly_scores = model(torch.FloatTensor(X_test)).squeeze().numpy()
auc = roc_auc_score(y_test, anomaly_scores)
print(f"AUC-ROC: {auc:.4f}")
# Step 6: Plot the ROC curve
fpr, tpr, _ = roc_curve(y_test, anomaly_scores)
plt.figure()
plt.plot(fpr, tpr, label=f"ROC curve (AUC = {auc:.4f})")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend(loc="best")
plt.grid(True)
plt.show()
# Run the training and evaluation
input_dim = X_train.shape[1]
model = AnomalyScoringNetwork(input_dim)
train_model(model, X_train, y_train)
evaluate_model(model, X_test, y_test)
Editor is loading...
Leave a Comment