Model1.py
unknown
python
8 months ago
9.3 kB
6
Indexable
# model_train_eval.py
"""
Trains a Transformer model on saved data, evaluates predictability, and saves models to mar2025/models/.
Run after data_download.py in Google Colab (GPU recommended).
Includes time features and observed mask for TimeSeriesTransformerForPrediction with None checks.
"""
from google.colab import drive
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import torch
from transformers import TimeSeriesTransformerForPrediction, TimeSeriesTransformerConfig
import os
# Mount Google Drive
drive.mount('/content/drive')
# Configuration
BASE_DIR = '/content/drive/MyDrive/mar2025'
DATA_PATH = os.path.join(BASE_DIR, 'data')
MODEL_PATH = os.path.join(BASE_DIR, 'models')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Load data
def load_data(symbol):
file_path = os.path.join(DATA_PATH, f"{symbol}.csv")
if os.path.exists(file_path):
return pd.read_csv(file_path, index_col=0, parse_dates=True)
raise FileNotFoundError(f"Data for {symbol} not found at {file_path}")
# Prepare sequences with time features
def create_sequences(data, context_length, prediction_length):
scaler = MinMaxScaler(feature_range=(0, 1))
value_scaled = scaler.fit_transform(data[['value']])
time_features = data[['day', 'month', 'year', 'day_of_year']].values # Ensure time features exist
if len(data) < context_length + prediction_length:
raise ValueError(f"Data length {len(data)} too short for context_length {context_length} and prediction_length {prediction_length}")
X_values, y_values, X_time = [], [], []
for i in range(len(data) - context_length - prediction_length + 1):
X_values.append(value_scaled[i:i + context_length])
y_values.append(value_scaled[i + context_length:i + context_length + prediction_length])
X_time.append(time_features[i:i + context_length])
X_values = np.array(X_values)
y_values = np.array(y_values)
X_time = np.array(X_time)
if X_values.size == 0 or y_values.size == 0 or X_time.size == 0:
raise ValueError(f"Empty sequence arrays for {symbol}")
return (X_values, y_values, X_time, scaler)
# Train and predict
def train_and_predict(data, symbol, context_length=20, prediction_length=5):
try:
X_values, y_values, X_time, scaler = create_sequences(data, context_length, prediction_length)
except ValueError as e:
print(f"Skipping {symbol}: {e}")
return None, None, None
train_size = int(len(X_values) * 0.8)
if train_size < 1:
print(f"Skipping {symbol}: Not enough data for training split")
return None, None, None
X_values_train, X_values_test = X_values[:train_size], X_values[train_size:]
y_values_train, y_values_test = y_values[:train_size], y_values[train_size:]
X_time_train, X_time_test = X_time[:train_size], X_time[train_size:]
# Convert to tensors with None checks
X_values_train = torch.tensor(X_values_train, dtype=torch.float32).to(device)
y_values_train = torch.tensor(y_values_train, dtype=torch.float32).to(device)
X_time_train = torch.tensor(X_time_train, dtype=torch.float32).to(device)
X_values_test = torch.tensor(X_values_test, dtype=torch.float32).to(device)
X_time_test = torch.tensor(X_time_test, dtype=torch.float32).to(device)
y_values_test = torch.tensor(y_values_test, dtype=torch.float32).to(device)
# Create observed mask (all 1s, assuming no missing data)
past_observed_mask_train = torch.ones_like(X_values_train, dtype=torch.float32).to(device)
past_observed_mask_test = torch.ones_like(X_values_test, dtype=torch.float32).to(device)
# Verify tensors
for tensor, name in [
(X_values_train, "X_values_train"), (y_values_train, "y_values_train"),
(X_time_train, "X_time_train"), (past_observed_mask_train, "past_observed_mask_train"),
(X_values_test, "X_values_test"), (X_time_test, "X_time_test"),
(past_observed_mask_test, "past_observed_mask_test")
]:
if tensor is None:
raise ValueError(f"{name} is None for {symbol}")
# Model configuration
config = TimeSeriesTransformerConfig(
prediction_length=prediction_length,
context_length=context_length,
input_size=1, # For 'value'
time_features=4, # day, month, year, day_of_year
d_model=64,
)
model = TimeSeriesTransformerForPrediction(config).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = torch.nn.MSELoss()
# Training loop
for epoch in range(20):
model.train()
optimizer.zero_grad()
outputs = model(
past_values=X_values_train,
past_time_features=X_time_train,
past_observed_mask=past_observed_mask_train,
future_values=y_values_train
)
if outputs.prediction_outputs is None:
raise ValueError(f"Model output is None for {symbol} at epoch {epoch+1}")
loss = loss_fn(outputs.prediction_outputs, y_values_train)
loss.backward()
optimizer.step()
print(f"{symbol} - Epoch {epoch+1}/20, Loss: {loss.item():.4f}")
# Inference
model.eval()
with torch.no_grad():
outputs = model(
past_values=X_values_test,
past_time_features=X_time_test,
past_observed_mask=past_observed_mask_test
)
if outputs.prediction_outputs is None:
raise ValueError(f"Model inference output is None for {symbol}")
predictions = outputs.prediction_outputs.cpu().numpy()
predictions = scaler.inverse_transform(predictions[:, -1, :])
y_test_inv = scaler.inverse_transform(y_values_test[:, -1, :].cpu().numpy())
# Save model
if not os.path.exists(MODEL_PATH):
os.makedirs(MODEL_PATH)
print(f"Created directory: {MODEL_PATH}")
torch.save(model.state_dict(), os.path.join(MODEL_PATH, f"{symbol}_model.pth"))
return predictions, y_test_inv, model
# Detect turning points
def find_turning_points(series):
"""
Identifies turning points (peaks and troughs) in a time series.
- Peak: A point higher than both its predecessor and successor (local maximum).
- Trough: A point lower than both its predecessor and successor (local minimum).
Returns a list of (index, value, type) tuples.
"""
turning_points = []
for i in range(1, len(series) - 1):
if series[i] > series[i-1] and series[i] > series[i+1]:
turning_points.append((i, series[i], 'peak'))
elif series[i] < series[i-1] and series[i] < series[i+1]:
turning_points.append((i, series[i], 'trough'))
return turning_points
# Calculate predictability
def calculate_predictability(true, pred):
mae = np.mean(np.abs(true - pred))
true_tp = find_turning_points(true.flatten())
pred_tp = find_turning_points(pred.flatten())
true_tp_set = set((tp[0], tp[2]) for tp in true_tp)
pred_tp_set = set((tp[0], tp[2]) for tp in pred_tp)
tp_accuracy = len(true_tp_set & pred_tp_set) / len(true_tp_set) if true_tp_set else 0
return mae, tp_accuracy
# Main evaluation
def evaluate_instruments(instruments):
results = {}
prediction_data = {}
for symbol in instruments:
print(f"\nProcessing {symbol}...")
predictions, y_test_inv, model = train_and_predict(data=load_data(symbol), symbol=symbol)
if predictions is None or y_test_inv is None:
print(f"Skipping {symbol} due to training failure")
continue
mae, tp_accuracy = calculate_predictability(y_test_inv, predictions)
results[symbol] = {'MAE': mae, 'TP_Accuracy': tp_accuracy}
prediction_data[symbol] = (predictions, y_test_inv)
# Results table
results_df = pd.DataFrame(results).T
results_df.columns = ['MAE', 'Turning Point Accuracy']
results_df.index.name = 'Instrument'
print("\nPredictability Results Table:")
print(results_df.to_string(float_format="%.4f"))
# Save top 10 models (already saved, just rank)
top_10 = results_df.nlargest(10, 'Turning Point Accuracy').index
print(f"\nTop 10 instruments saved: {list(top_10)}")
# Plot top 3
top_3 = results_df.nlargest(3, 'Turning Point Accuracy').index
for symbol in top_3:
predictions, y_test_inv = prediction_data[symbol]
pred_tp = find_turning_points(predictions.flatten())
plt.figure(figsize=(12, 6))
plt.plot(y_test_inv, label='True Data', color='blue')
plt.plot(predictions, label='Predicted Data', color='orange')
for tp in pred_tp:
plt.plot(tp[0], tp[1], marker='*', color='red', markersize=10, label='Pred Turning Point' if tp == pred_tp[0] else "")
plt.legend()
plt.title(f'{symbol} Prediction with Turning Points (Top 3)')
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.savefig(os.path.join(DATA_PATH, f"{symbol}_top3_plot.png"))
plt.show()
return results_df
if __name__ == "__main__":
instruments = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'NVDA', 'META', 'JPM', 'WMT', 'XOM', 'ES=F', 'CL=F', 'GC=F', 'SI=F']
evaluate_instruments(instruments)Editor is loading...
Leave a Comment