Model1.py
unknown
python
11 days ago
9.3 kB
3
Indexable
# model_train_eval.py """ Trains a Transformer model on saved data, evaluates predictability, and saves models to mar2025/models/. Run after data_download.py in Google Colab (GPU recommended). Includes time features and observed mask for TimeSeriesTransformerForPrediction with None checks. """ from google.colab import drive import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler import torch from transformers import TimeSeriesTransformerForPrediction, TimeSeriesTransformerConfig import os # Mount Google Drive drive.mount('/content/drive') # Configuration BASE_DIR = '/content/drive/MyDrive/mar2025' DATA_PATH = os.path.join(BASE_DIR, 'data') MODEL_PATH = os.path.join(BASE_DIR, 'models') device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # Load data def load_data(symbol): file_path = os.path.join(DATA_PATH, f"{symbol}.csv") if os.path.exists(file_path): return pd.read_csv(file_path, index_col=0, parse_dates=True) raise FileNotFoundError(f"Data for {symbol} not found at {file_path}") # Prepare sequences with time features def create_sequences(data, context_length, prediction_length): scaler = MinMaxScaler(feature_range=(0, 1)) value_scaled = scaler.fit_transform(data[['value']]) time_features = data[['day', 'month', 'year', 'day_of_year']].values # Ensure time features exist if len(data) < context_length + prediction_length: raise ValueError(f"Data length {len(data)} too short for context_length {context_length} and prediction_length {prediction_length}") X_values, y_values, X_time = [], [], [] for i in range(len(data) - context_length - prediction_length + 1): X_values.append(value_scaled[i:i + context_length]) y_values.append(value_scaled[i + context_length:i + context_length + prediction_length]) X_time.append(time_features[i:i + context_length]) X_values = np.array(X_values) y_values = np.array(y_values) X_time = np.array(X_time) if X_values.size == 0 or y_values.size == 0 or X_time.size == 0: raise ValueError(f"Empty sequence arrays for {symbol}") return (X_values, y_values, X_time, scaler) # Train and predict def train_and_predict(data, symbol, context_length=20, prediction_length=5): try: X_values, y_values, X_time, scaler = create_sequences(data, context_length, prediction_length) except ValueError as e: print(f"Skipping {symbol}: {e}") return None, None, None train_size = int(len(X_values) * 0.8) if train_size < 1: print(f"Skipping {symbol}: Not enough data for training split") return None, None, None X_values_train, X_values_test = X_values[:train_size], X_values[train_size:] y_values_train, y_values_test = y_values[:train_size], y_values[train_size:] X_time_train, X_time_test = X_time[:train_size], X_time[train_size:] # Convert to tensors with None checks X_values_train = torch.tensor(X_values_train, dtype=torch.float32).to(device) y_values_train = torch.tensor(y_values_train, dtype=torch.float32).to(device) X_time_train = torch.tensor(X_time_train, dtype=torch.float32).to(device) X_values_test = torch.tensor(X_values_test, dtype=torch.float32).to(device) X_time_test = torch.tensor(X_time_test, dtype=torch.float32).to(device) y_values_test = torch.tensor(y_values_test, dtype=torch.float32).to(device) # Create observed mask (all 1s, assuming no missing data) past_observed_mask_train = torch.ones_like(X_values_train, dtype=torch.float32).to(device) past_observed_mask_test = torch.ones_like(X_values_test, dtype=torch.float32).to(device) # Verify tensors for tensor, name in [ (X_values_train, "X_values_train"), (y_values_train, "y_values_train"), (X_time_train, "X_time_train"), (past_observed_mask_train, "past_observed_mask_train"), (X_values_test, "X_values_test"), (X_time_test, "X_time_test"), (past_observed_mask_test, "past_observed_mask_test") ]: if tensor is None: raise ValueError(f"{name} is None for {symbol}") # Model configuration config = TimeSeriesTransformerConfig( prediction_length=prediction_length, context_length=context_length, input_size=1, # For 'value' time_features=4, # day, month, year, day_of_year d_model=64, ) model = TimeSeriesTransformerForPrediction(config).to(device) optimizer = torch.optim.Adam(model.parameters(), lr=0.001) loss_fn = torch.nn.MSELoss() # Training loop for epoch in range(20): model.train() optimizer.zero_grad() outputs = model( past_values=X_values_train, past_time_features=X_time_train, past_observed_mask=past_observed_mask_train, future_values=y_values_train ) if outputs.prediction_outputs is None: raise ValueError(f"Model output is None for {symbol} at epoch {epoch+1}") loss = loss_fn(outputs.prediction_outputs, y_values_train) loss.backward() optimizer.step() print(f"{symbol} - Epoch {epoch+1}/20, Loss: {loss.item():.4f}") # Inference model.eval() with torch.no_grad(): outputs = model( past_values=X_values_test, past_time_features=X_time_test, past_observed_mask=past_observed_mask_test ) if outputs.prediction_outputs is None: raise ValueError(f"Model inference output is None for {symbol}") predictions = outputs.prediction_outputs.cpu().numpy() predictions = scaler.inverse_transform(predictions[:, -1, :]) y_test_inv = scaler.inverse_transform(y_values_test[:, -1, :].cpu().numpy()) # Save model if not os.path.exists(MODEL_PATH): os.makedirs(MODEL_PATH) print(f"Created directory: {MODEL_PATH}") torch.save(model.state_dict(), os.path.join(MODEL_PATH, f"{symbol}_model.pth")) return predictions, y_test_inv, model # Detect turning points def find_turning_points(series): """ Identifies turning points (peaks and troughs) in a time series. - Peak: A point higher than both its predecessor and successor (local maximum). - Trough: A point lower than both its predecessor and successor (local minimum). Returns a list of (index, value, type) tuples. """ turning_points = [] for i in range(1, len(series) - 1): if series[i] > series[i-1] and series[i] > series[i+1]: turning_points.append((i, series[i], 'peak')) elif series[i] < series[i-1] and series[i] < series[i+1]: turning_points.append((i, series[i], 'trough')) return turning_points # Calculate predictability def calculate_predictability(true, pred): mae = np.mean(np.abs(true - pred)) true_tp = find_turning_points(true.flatten()) pred_tp = find_turning_points(pred.flatten()) true_tp_set = set((tp[0], tp[2]) for tp in true_tp) pred_tp_set = set((tp[0], tp[2]) for tp in pred_tp) tp_accuracy = len(true_tp_set & pred_tp_set) / len(true_tp_set) if true_tp_set else 0 return mae, tp_accuracy # Main evaluation def evaluate_instruments(instruments): results = {} prediction_data = {} for symbol in instruments: print(f"\nProcessing {symbol}...") predictions, y_test_inv, model = train_and_predict(data=load_data(symbol), symbol=symbol) if predictions is None or y_test_inv is None: print(f"Skipping {symbol} due to training failure") continue mae, tp_accuracy = calculate_predictability(y_test_inv, predictions) results[symbol] = {'MAE': mae, 'TP_Accuracy': tp_accuracy} prediction_data[symbol] = (predictions, y_test_inv) # Results table results_df = pd.DataFrame(results).T results_df.columns = ['MAE', 'Turning Point Accuracy'] results_df.index.name = 'Instrument' print("\nPredictability Results Table:") print(results_df.to_string(float_format="%.4f")) # Save top 10 models (already saved, just rank) top_10 = results_df.nlargest(10, 'Turning Point Accuracy').index print(f"\nTop 10 instruments saved: {list(top_10)}") # Plot top 3 top_3 = results_df.nlargest(3, 'Turning Point Accuracy').index for symbol in top_3: predictions, y_test_inv = prediction_data[symbol] pred_tp = find_turning_points(predictions.flatten()) plt.figure(figsize=(12, 6)) plt.plot(y_test_inv, label='True Data', color='blue') plt.plot(predictions, label='Predicted Data', color='orange') for tp in pred_tp: plt.plot(tp[0], tp[1], marker='*', color='red', markersize=10, label='Pred Turning Point' if tp == pred_tp[0] else "") plt.legend() plt.title(f'{symbol} Prediction with Turning Points (Top 3)') plt.xlabel('Time Step') plt.ylabel('Value') plt.savefig(os.path.join(DATA_PATH, f"{symbol}_top3_plot.png")) plt.show() return results_df if __name__ == "__main__": instruments = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'NVDA', 'META', 'JPM', 'WMT', 'XOM', 'ES=F', 'CL=F', 'GC=F', 'SI=F'] evaluate_instruments(instruments)
Editor is loading...
Leave a Comment