Untitled
unknown
plain_text
a month ago
25 kB
4
Indexable
import pandas as pd import numpy as np from datetime import datetime from dateutil.relativedelta import relativedelta import statsmodels.api as sm from scipy.stats import zscore ############################################### # 1. Data Loading (Filtering Out Weekends) ############################################### def load_price_data(filepath): """ Load historical prices from an Excel file. Assumes that the first column is dates and the remaining columns are tickers. Removes weekend data. """ df = pd.read_excel(filepath, index_col=0) df.index = pd.to_datetime(df.index) df = df.sort_index() # Filter out weekends (Saturday=5, Sunday=6) df = df[df.index.dayofweek < 5] return df def load_macro_data(filepath): """ Load macro indicators from an Excel file. Loads VIX and VIX3M from the 'Eq' sheet and other macro indicators from the 'Macro' sheet. FI signals are dropped (so no LF98TRUU is loaded) but FI tickers can still be used later. Removes weekend data and computes slopes for selected macro indicators. Also computes the VIX term structure and its exponential moving average. """ # VIX data vix_data = pd.read_excel(filepath, sheet_name='Eq', index_col=0, parse_dates=True, usecols=[0, 4, 5]) vix_data.columns = ['VIX', 'VIX3M'] vix_data = vix_data[vix_data.index.dayofweek < 5] # Macro data (assumed to include columns "CESIUSD Index", "INJCJC Index", ".HG/GC G Index", "Consumer Confidence") macro_data = pd.read_excel(filepath, sheet_name='Macro', index_col=0, parse_dates=True, usecols=range(8), skiprows=1) macro_data = macro_data[macro_data.index.dayofweek < 5] # Compute slopes for selected macro indicators. macro_data["Surprise Index Slope"] = macro_data["CESIUSD Index"].diff() macro_data["Jobless Claims Slope"] = macro_data["INJCJC Index"].diff() macro_data["Copper Gold Slope"] = macro_data['.HG/GC G Index'].diff() # Combine VIX data and macro data combined_data = pd.concat([vix_data, macro_data], axis=1) combined_data = combined_data.fillna(method='ffill').fillna(method='bfill') combined_data = combined_data.sort_index() # Compute VIX term structure and its EMA combined_data['VIX_Spread'] = combined_data['VIX'] - combined_data['VIX3M'] combined_data['VIX_Spread_EMA'] = combined_data['VIX_Spread'].ewm(span=5, adjust=False).mean() return combined_data ############################################### # 2. Helper: Observation Dates (Monthly) ############################################### def get_observation_dates(prices, start_date, end_date, rebalance_period): dates = [] current_date = start_date while current_date < end_date: candidate_date = (current_date + relativedelta(months=rebalance_period)).replace(day=1) while candidate_date not in prices.index: candidate_date += pd.Timedelta(days=1) if candidate_date.month != (current_date + relativedelta(months=rebalance_period)).month: candidate_date = None break if candidate_date is None or candidate_date > end_date: break dates.append(candidate_date) current_date = candidate_date return dates ############################################### # 3. Portfolio Initialization ############################################### def initialize_portfolio(prices, date, tickers, initial_aum): portfolio = {} mask = prices.index < date prev_date = prices.index[mask][-1] allocation = initial_aum / len(tickers) for ticker in tickers: price = prices.loc[prev_date, ticker] portfolio[ticker] = allocation / price return portfolio ############################################### # 4. Lookback Metric Computation ############################################### def compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type='simple'): prices = prices.sort_index() mask = prices.index < current_date prev_date = prices.index[mask][-1] lookback_date = prev_date - relativedelta(months=lookback_period) current_price = prices[ticker].asof(prev_date) lookback_price = prices[ticker].asof(lookback_date) if pd.isna(current_price) or pd.isna(lookback_price): raise ValueError(f"Missing price data for {ticker} on {prev_date} or {lookback_date}.") if metric_type == 'simple': metric = (current_price / lookback_price) - 1 elif metric_type == 'sma': window = prices[ticker].loc[lookback_date:current_date] if window.empty: raise ValueError(f"No price data for {ticker} between {lookback_date} and {current_date}.") sma = window.mean() metric = (current_price - sma) / sma else: raise ValueError("Invalid metric type. Choose 'simple' or 'sma'.") return metric ############################################### # 5. Ranking Assets by Momentum ############################################### def rank_assets(prices, current_date, tickers, lookback_period, metric_type): metrics = {} for ticker in tickers: metric = compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type) metrics[ticker] = metric sorted_tickers = sorted(metrics, key=metrics.get, reverse=True) ranks = {ticker: rank+1 for rank, ticker in enumerate(sorted_tickers)} return sorted_tickers, ranks, metrics ############################################### # 6. Compute Current Portfolio Value ############################################### def compute_portfolio_value(portfolio, prices, current_date): value = 0 for ticker, quantity in portfolio.items(): price = prices.loc[current_date, ticker] value += quantity * price return value ############################################### # 7. Rebalance the Momentum Portfolio ############################################### def rebalance_portfolio(portfolio, prices, current_date, tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio): mask = prices.index < current_date prev_date = prices.index[mask][-1] prev_prices = prices.loc[prev_date] curr_prices = prices.loc[current_date] portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in tickers) rebalance_amount = portfolio_value * rebalance_ratio target_trades = {ticker: rebalance_amount * internal_rebalance_ratios[i] for i, ticker in enumerate(sorted_tickers)} total_sold = 0 actual_trades = {} for ticker, target_trade in target_trades.items(): if target_trade < 0: available_notional = portfolio[ticker] * curr_prices[ticker] sell_target = abs(target_trade) actual_sell = min(available_notional, sell_target) actual_trades[ticker] = -actual_sell total_sold += actual_sell else: actual_trades[ticker] = 0 total_buy_target = sum(t for t in target_trades.values() if t > 0) if total_buy_target > 0: for ticker, target_trade in target_trades.items(): if target_trade > 0: proportion = target_trade / total_buy_target buy_amount = total_sold * proportion actual_trades[ticker] = buy_amount new_portfolio = portfolio.copy() for ticker, trade_notional in actual_trades.items(): execution_price = curr_prices[ticker] qty_change = trade_notional / execution_price new_portfolio[ticker] += qty_change return new_portfolio, actual_trades, portfolio_value def adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70): mask = prices.index < current_date prev_date = prices.index[mask][-1] prev_prices = prices.loc[prev_date] curr_prices = prices.loc[current_date] portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in portfolio) weights = {ticker: (portfolio[ticker] * prev_prices[ticker]) / portfolio_value for ticker in portfolio} new_portfolio = portfolio.copy() for overweight in portfolio: if weights[overweight] > threshold: extra_weight = weights[overweight] - threshold extra_value = extra_weight * portfolio_value execution_price_over = curr_prices[overweight] qty_reduce = extra_value / execution_price_over new_portfolio[overweight] -= qty_reduce remaining_value = extra_value for candidate in sorted_tickers: if candidate == overweight: continue candidate_value = new_portfolio[candidate] * curr_prices[candidate] candidate_weight = candidate_value / portfolio_value if candidate_weight < threshold: capacity = (threshold - candidate_weight) * portfolio_value allocation = min(remaining_value, capacity) qty_add = allocation / curr_prices[candidate] new_portfolio[candidate] += qty_add remaining_value -= allocation if remaining_value <= 0: break return new_portfolio ############################################### # 8. Rolling OLS Regression for VIX Signal ############################################### def rolling_regression_signal(prices, macro_data, current_date, regression_window=100): """ Run a rolling OLS regression on the past regression_window days to predict SPY returns based on the VIX futures term structure. The regression specification is: R_{t+1} = α + β * (Slope_positive)_t + γ * (Slope_negative)_t + ε_t where: Slope_positive = VIX_Spread_EMA if positive, else 0 Slope_negative = VIX_Spread_EMA if negative, else 0 Returns the predicted return for the next period using yesterday's slope values. """ # Select available dates up to current_date (need at least regression_window+1 data points) available_dates = macro_data.index[macro_data.index < current_date] if len(available_dates) < regression_window + 1: return None # Not enough data to run regression # Use the most recent regression_window+1 dates sample_dates = available_dates[-(regression_window+1):] X = [] y = [] for i in range(len(sample_dates) - 1): t = sample_dates[i] t_next = sample_dates[i+1] slope = macro_data.loc[t, 'VIX_Spread_EMA'] slope_pos = slope if slope > 0 else 0 slope_neg = slope if slope < 0 else 0 # Get SPY return from t to t_next: try: price_t = prices.loc[t, 'SPY US Equity'] price_t_next = prices.loc[t_next, 'SPY US Equity'] except KeyError: continue ret = (price_t_next / price_t) - 1 X.append([slope_pos, slope_neg]) y.append(ret) if len(X) < regression_window: # not enough data after filtering return None X = np.array(X) y = np.array(y) # Add constant term X = sm.add_constant(X) model = sm.OLS(y, X).fit(cov_type='HAC', cov_kwds={'maxlags':1}) # Use yesterday's slope values for prediction yesterday = available_dates[-1] slope_yesterday = macro_data.loc[yesterday, 'VIX_Spread_EMA'] slope_pos_yesterday = slope_yesterday if slope_yesterday > 0 else 0 slope_neg_yesterday = slope_yesterday if slope_yesterday < 0 else 0 X_pred = np.array([1, slope_pos_yesterday, slope_neg_yesterday]) predicted_return = model.predict(X_pred)[0] return predicted_return def map_predicted_return_to_alloc(pred_return, macro_min_alloc=0.6): """ Map the predicted SPY return to a target allocation. If the predicted return is non-negative, allocate 100% (risk-on). If negative, linearly scale the allocation between 100% and macro_min_alloc. For example, if pred_return is -0.20 or lower, target_alloc = macro_min_alloc. """ if pred_return is None or pred_return >= 0: return 1.0 max_neg = 0.20 # maximum negative return considered factor = (1.0 - macro_min_alloc) / max_neg alloc = 1.0 - factor * abs(pred_return) alloc = max(min(alloc, 1.0), macro_min_alloc) return alloc ############################################### # 9. Helper Functions for Cash (using a cash ticker) ############################################### def invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker): """ When switching from risk-off to risk-on, sell the cash instrument (e.g. SHV) and reinvest its proceeds into the portfolio using previous-day weights. """ cash_price = prices.loc[current_date, cash_ticker] cash_available = cash_qty * cash_price if cash_available <= 0: return portfolio, cash_qty, f"No {cash_ticker} to reinvest." mask = prices.index < current_date prev_date = prices.index[mask][-1] mom_value = compute_portfolio_value(portfolio, prices, current_date) new_portfolio = portfolio.copy() for ticker in portfolio: total_qty = (portfolio[ticker] * (mom_value + cash_available)) / mom_value if mom_value > 0 else 1/len(portfolio) new_portfolio[ticker] = total_qty return new_portfolio, 0.0, f"Reinvested {cash_available:,.2f} from {cash_ticker}" def allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker): """ Adjusts portfolio positions based on the target allocation for cash. When deploying cash, scales down/up existing positions proportionally to reach the target allocation. """ mask = prices.index < current_date prev_date = prices.index[mask][-1] curr_value = compute_portfolio_value(portfolio, prices, prev_date) cash_price = prices.loc[prev_date, cash_ticker] cash_equiv = cash_qty * cash_price total_aum = curr_value + cash_equiv desired_cash = (1 - target_alloc) * total_aum cash_price = prices.loc[current_date, cash_ticker] cash_equiv = cash_qty * cash_price current_cash = cash_equiv new_portfolio = portfolio.copy() new_cash_qty = cash_qty note = "" if desired_cash > current_cash: cash_to_raise = desired_cash - current_cash curr_value = compute_portfolio_value(portfolio, prices, current_date) for ticker in portfolio: price = prices.loc[current_date, ticker] sell_ratio = (cash_to_raise / curr_value) qty_to_sell = portfolio[ticker] * sell_ratio new_portfolio[ticker] -= qty_to_sell new_cash_qty += cash_to_raise / cash_price note = f"Raised {cash_to_raise:,.2f} into {cash_ticker}" elif desired_cash < current_cash: current_portfolio_value = sum( portfolio[ticker] * prices.loc[current_date, ticker] for ticker in portfolio ) desired_risk_allocation = total_aum * target_alloc scaling_factor = desired_risk_allocation / current_portfolio_value for ticker in portfolio: new_portfolio[ticker] = portfolio[ticker] * scaling_factor excess_cash = current_cash - desired_cash new_cash_qty -= excess_cash / cash_price note = f"Deployed {excess_cash:,.2f} from {cash_ticker} into portfolio" return new_portfolio, new_cash_qty, note ############################################### # 10. Simulation: Strategy with Cash & VIX Regression Signal ############################################### def simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers, initial_aum, start_date, end_date, rebalance_period, rebalance_ratio, lookback_period, metric_type, internal_rebalance_ratios, regression_window=100, cash_ticker='SHV US Equity', macro_min_alloc=0.6, max_alloc_change=0.05): """ This simulation: - Uses a designated cash_ticker (e.g. "SHV US Equity") for the cash component. - Excludes the cash_ticker from momentum ranking/trading. - When target allocation is below 100% (risk-off), excess portfolio value is sold into the cash_ticker. - When switching back to risk-on, the cash position is reinvested. - The regime and allocation signals are now based solely on a rolling regression of the VIX futures term structure. - The regression is run on a rolling window (e.g. 100 days) to predict SPY returns. - If the predicted return is negative, the strategy goes risk-off and scales the target allocation down. """ all_tickers = eq_tickers + fi_tickers + alts_tickers momentum_tickers = [t for t in all_tickers if t != cash_ticker] monthly_dates = get_observation_dates(prices, start_date, end_date, rebalance_period) daily_dates = prices.index.sort_values() daily_dates = daily_dates[(daily_dates >= start_date) & (daily_dates <= end_date)] portfolio = initialize_portfolio(prices, start_date, momentum_tickers, initial_aum) cash_qty = 0.0 current_regime = 'risk-on' target_alloc = 1.0 prev_total_aum = initial_aum results = [] for current_date in daily_dates: daily_note = "No adjustment" # Generate signal using rolling regression on VIX term structure. pred_return = rolling_regression_signal(prices, macro_data, current_date, regression_window) if pred_return is None: # Not enough data; default to risk-on. pred_return = 0.0 # Map predicted return to a target allocation. if pred_return < 0: current_regime = 'risk-off' else: current_regime = 'risk-on' target_alloc = map_predicted_return_to_alloc(pred_return, macro_min_alloc) # For reference, record the predicted return and target allocation. daily_note += f" | Predicted SPY return: {pred_return:.4f}, Target Alloc: {target_alloc:.2f}" mask = prices.index < current_date prev_date = prices.index[mask][-1] mom_value = compute_portfolio_value(portfolio, prices, current_date) # Adjust cash allocation if needed. if current_regime == 'risk-on' and cash_qty > 0: portfolio, cash_qty, note_update = invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker) daily_note += " | " + note_update elif current_regime == 'risk-off': portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker) daily_note += " | " + note_update # Monthly rebalancing using momentum ranking. if current_date in monthly_dates: sorted_tickers, ranks, metrics = rank_assets(prices, current_date, momentum_tickers, lookback_period, metric_type) temp_portfolio, trades, pre_rebalance_value = rebalance_portfolio(portfolio, prices, current_date, momentum_tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio) temp_portfolio = adjust_overweight(temp_portfolio, prices, current_date, sorted_tickers, threshold=0.70) temp_value = compute_portfolio_value(temp_portfolio, prices, current_date) # Example: if the combined weight of SPY and HYG is too low when risk-off, force risk-on. spy_temp = temp_portfolio.get('SPY US Equity', 0) * prices.loc[current_date, 'SPY US Equity'] hyg_temp = temp_portfolio.get('HYG US Equity', 0) * prices.loc[current_date, 'HYG US Equity'] combined_weight = (spy_temp + hyg_temp) / temp_value if temp_value > 0 else 0 if current_regime == 'risk-off' and combined_weight < 0.40: current_regime = 'risk-on' target_alloc = 1.0 daily_note += " | Monthly: Forced risk-on due to SPY+HYG weight < 40%." total_aum = compute_portfolio_value(portfolio, prices, current_date) + cash_qty * prices.loc[current_date, cash_ticker] simulated_value = temp_value new_portfolio = {} for ticker in temp_portfolio: price = prices.loc[current_date, ticker] simulated_weight = (temp_portfolio[ticker] * price) / simulated_value if simulated_value > 0 else 1/len(temp_portfolio) new_qty = (total_aum * simulated_weight) / price new_portfolio[ticker] = new_qty portfolio = new_portfolio cash_qty = 0 else: portfolio = temp_portfolio curr_value = compute_portfolio_value(portfolio, prices, current_date) total_aum = curr_value + cash_qty * prices.loc[current_date, cash_ticker] desired_value = target_alloc * total_aum if curr_value > desired_value: portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker) daily_note += " | Monthly: " + note_update elif curr_value < desired_value and cash_qty > 0: portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker) daily_note += " | Monthly: " + note_update current_mom_value = compute_portfolio_value(portfolio, prices, current_date) cash_price = prices.loc[current_date, cash_ticker] cash_value = cash_qty * cash_price total_aum = current_mom_value + cash_value ret = (total_aum - prev_total_aum) / prev_total_aum if prev_total_aum > 0 else 0 prev_total_aum = total_aum row = { 'Date': current_date, 'Momentum AUM': current_mom_value, 'Cash Qty': cash_qty, 'Cash Price': cash_price, 'Cash Value': cash_value, 'Total AUM': total_aum, 'Current Regime': current_regime, 'Target Alloc': target_alloc, 'Predicted SPY Return': pred_return, 'Adjustment Note': daily_note, 'Return': ret, 'Event': 'Monthly Rebalance' if current_date in monthly_dates else 'Daily Check' } for ticker in momentum_tickers: price = prices.loc[current_date, ticker] qty = portfolio[ticker] notional = qty * price row[f'qty_{ticker}'] = qty row[f'notional_{ticker}'] = notional row[f'weight_{ticker}'] = (notional / current_mom_value) if current_mom_value > 0 else np.nan if current_date in monthly_dates: row[f'rank_{ticker}'] = ranks.get(ticker, np.nan) row[f'metric_{ticker}'] = metrics.get(ticker, np.nan) row[f'trade_{ticker}'] = trades.get(ticker, 0) else: row[f'rank_{ticker}'] = np.nan row[f'metric_{ticker}'] = np.nan row[f'trade_{ticker}'] = 0 results.append(row) result_df = pd.DataFrame(results) result_df.set_index('Date', inplace=True) return result_df ############################################### # 11. Main – Example Usage ############################################### if __name__ == '__main__': eq_tickers = ['SPY US Equity'] fi_tickers = ['TLT US Equity', 'HYG US Equity'] alts_tickers = ['GLD US Equity', 'IGSB US Equity'] initial_aum = 100e6 start_date = pd.to_datetime('2008-01-01') end_date = pd.to_datetime('2025-02-01') rebalance_period = 1 rebalance_ratio = 0.2 lookback_period = 6 metric_type = 'simple' internal_rebalance_ratios = [0.8, 0.2, 0, -0.2, -0.8] price_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Codes\Historic Prices.xlsx" macro_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Momentum Strategy Overlay Data.xlsx" prices = load_price_data(price_filepath) macro_data = load_macro_data(macro_filepath) result_df = simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers, initial_aum, start_date, end_date, rebalance_period, rebalance_ratio, lookback_period, metric_type, internal_rebalance_ratios, regression_window=100, cash_ticker='SHV US Equity', macro_min_alloc=0.6, max_alloc_change=0.05) pd.set_option('display.float_format', lambda x: f'{x:,.2f}') # For example, to display the last few rows: # print(result_df[['Total AUM', 'Momentum AUM', 'Cash Price']].tail())
Editor is loading...
Leave a Comment