Untitled
unknown
plain_text
7 months ago
25 kB
5
Indexable
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil.relativedelta import relativedelta
import statsmodels.api as sm
from scipy.stats import zscore
###############################################
# 1. Data Loading (Filtering Out Weekends)
###############################################
def load_price_data(filepath):
"""
Load historical prices from an Excel file.
Assumes that the first column is dates and the remaining columns are tickers.
Removes weekend data.
"""
df = pd.read_excel(filepath, index_col=0)
df.index = pd.to_datetime(df.index)
df = df.sort_index()
# Filter out weekends (Saturday=5, Sunday=6)
df = df[df.index.dayofweek < 5]
return df
def load_macro_data(filepath):
"""
Load macro indicators from an Excel file.
Loads VIX and VIX3M from the 'Eq' sheet and other macro indicators from the 'Macro' sheet.
FI signals are dropped (so no LF98TRUU is loaded) but FI tickers can still be used later.
Removes weekend data and computes slopes for selected macro indicators.
Also computes the VIX term structure and its exponential moving average.
"""
# VIX data
vix_data = pd.read_excel(filepath, sheet_name='Eq', index_col=0, parse_dates=True, usecols=[0, 4, 5])
vix_data.columns = ['VIX', 'VIX3M']
vix_data = vix_data[vix_data.index.dayofweek < 5]
# Macro data (assumed to include columns "CESIUSD Index", "INJCJC Index", ".HG/GC G Index", "Consumer Confidence")
macro_data = pd.read_excel(filepath, sheet_name='Macro', index_col=0, parse_dates=True, usecols=range(8), skiprows=1)
macro_data = macro_data[macro_data.index.dayofweek < 5]
# Compute slopes for selected macro indicators.
macro_data["Surprise Index Slope"] = macro_data["CESIUSD Index"].diff()
macro_data["Jobless Claims Slope"] = macro_data["INJCJC Index"].diff()
macro_data["Copper Gold Slope"] = macro_data['.HG/GC G Index'].diff()
# Combine VIX data and macro data
combined_data = pd.concat([vix_data, macro_data], axis=1)
combined_data = combined_data.fillna(method='ffill').fillna(method='bfill')
combined_data = combined_data.sort_index()
# Compute VIX term structure and its EMA
combined_data['VIX_Spread'] = combined_data['VIX'] - combined_data['VIX3M']
combined_data['VIX_Spread_EMA'] = combined_data['VIX_Spread'].ewm(span=5, adjust=False).mean()
return combined_data
###############################################
# 2. Helper: Observation Dates (Monthly)
###############################################
def get_observation_dates(prices, start_date, end_date, rebalance_period):
dates = []
current_date = start_date
while current_date < end_date:
candidate_date = (current_date + relativedelta(months=rebalance_period)).replace(day=1)
while candidate_date not in prices.index:
candidate_date += pd.Timedelta(days=1)
if candidate_date.month != (current_date + relativedelta(months=rebalance_period)).month:
candidate_date = None
break
if candidate_date is None or candidate_date > end_date:
break
dates.append(candidate_date)
current_date = candidate_date
return dates
###############################################
# 3. Portfolio Initialization
###############################################
def initialize_portfolio(prices, date, tickers, initial_aum):
portfolio = {}
mask = prices.index < date
prev_date = prices.index[mask][-1]
allocation = initial_aum / len(tickers)
for ticker in tickers:
price = prices.loc[prev_date, ticker]
portfolio[ticker] = allocation / price
return portfolio
###############################################
# 4. Lookback Metric Computation
###############################################
def compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type='simple'):
prices = prices.sort_index()
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
lookback_date = prev_date - relativedelta(months=lookback_period)
current_price = prices[ticker].asof(prev_date)
lookback_price = prices[ticker].asof(lookback_date)
if pd.isna(current_price) or pd.isna(lookback_price):
raise ValueError(f"Missing price data for {ticker} on {prev_date} or {lookback_date}.")
if metric_type == 'simple':
metric = (current_price / lookback_price) - 1
elif metric_type == 'sma':
window = prices[ticker].loc[lookback_date:current_date]
if window.empty:
raise ValueError(f"No price data for {ticker} between {lookback_date} and {current_date}.")
sma = window.mean()
metric = (current_price - sma) / sma
else:
raise ValueError("Invalid metric type. Choose 'simple' or 'sma'.")
return metric
###############################################
# 5. Ranking Assets by Momentum
###############################################
def rank_assets(prices, current_date, tickers, lookback_period, metric_type):
metrics = {}
for ticker in tickers:
metric = compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type)
metrics[ticker] = metric
sorted_tickers = sorted(metrics, key=metrics.get, reverse=True)
ranks = {ticker: rank+1 for rank, ticker in enumerate(sorted_tickers)}
return sorted_tickers, ranks, metrics
###############################################
# 6. Compute Current Portfolio Value
###############################################
def compute_portfolio_value(portfolio, prices, current_date):
value = 0
for ticker, quantity in portfolio.items():
price = prices.loc[current_date, ticker]
value += quantity * price
return value
###############################################
# 7. Rebalance the Momentum Portfolio
###############################################
def rebalance_portfolio(portfolio, prices, current_date, tickers, sorted_tickers,
internal_rebalance_ratios, rebalance_ratio):
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
prev_prices = prices.loc[prev_date]
curr_prices = prices.loc[current_date]
portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in tickers)
rebalance_amount = portfolio_value * rebalance_ratio
target_trades = {ticker: rebalance_amount * internal_rebalance_ratios[i]
for i, ticker in enumerate(sorted_tickers)}
total_sold = 0
actual_trades = {}
for ticker, target_trade in target_trades.items():
if target_trade < 0:
available_notional = portfolio[ticker] * curr_prices[ticker]
sell_target = abs(target_trade)
actual_sell = min(available_notional, sell_target)
actual_trades[ticker] = -actual_sell
total_sold += actual_sell
else:
actual_trades[ticker] = 0
total_buy_target = sum(t for t in target_trades.values() if t > 0)
if total_buy_target > 0:
for ticker, target_trade in target_trades.items():
if target_trade > 0:
proportion = target_trade / total_buy_target
buy_amount = total_sold * proportion
actual_trades[ticker] = buy_amount
new_portfolio = portfolio.copy()
for ticker, trade_notional in actual_trades.items():
execution_price = curr_prices[ticker]
qty_change = trade_notional / execution_price
new_portfolio[ticker] += qty_change
return new_portfolio, actual_trades, portfolio_value
def adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70):
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
prev_prices = prices.loc[prev_date]
curr_prices = prices.loc[current_date]
portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in portfolio)
weights = {ticker: (portfolio[ticker] * prev_prices[ticker]) / portfolio_value
for ticker in portfolio}
new_portfolio = portfolio.copy()
for overweight in portfolio:
if weights[overweight] > threshold:
extra_weight = weights[overweight] - threshold
extra_value = extra_weight * portfolio_value
execution_price_over = curr_prices[overweight]
qty_reduce = extra_value / execution_price_over
new_portfolio[overweight] -= qty_reduce
remaining_value = extra_value
for candidate in sorted_tickers:
if candidate == overweight:
continue
candidate_value = new_portfolio[candidate] * curr_prices[candidate]
candidate_weight = candidate_value / portfolio_value
if candidate_weight < threshold:
capacity = (threshold - candidate_weight) * portfolio_value
allocation = min(remaining_value, capacity)
qty_add = allocation / curr_prices[candidate]
new_portfolio[candidate] += qty_add
remaining_value -= allocation
if remaining_value <= 0:
break
return new_portfolio
###############################################
# 8. Rolling OLS Regression for VIX Signal
###############################################
def rolling_regression_signal(prices, macro_data, current_date, regression_window=100):
"""
Run a rolling OLS regression on the past regression_window days to predict SPY returns based on the VIX futures term structure.
The regression specification is:
R_{t+1} = α + β * (Slope_positive)_t + γ * (Slope_negative)_t + ε_t
where:
Slope_positive = VIX_Spread_EMA if positive, else 0
Slope_negative = VIX_Spread_EMA if negative, else 0
Returns the predicted return for the next period using yesterday's slope values.
"""
# Select available dates up to current_date (need at least regression_window+1 data points)
available_dates = macro_data.index[macro_data.index < current_date]
if len(available_dates) < regression_window + 1:
return None # Not enough data to run regression
# Use the most recent regression_window+1 dates
sample_dates = available_dates[-(regression_window+1):]
X = []
y = []
for i in range(len(sample_dates) - 1):
t = sample_dates[i]
t_next = sample_dates[i+1]
slope = macro_data.loc[t, 'VIX_Spread_EMA']
slope_pos = slope if slope > 0 else 0
slope_neg = slope if slope < 0 else 0
# Get SPY return from t to t_next:
try:
price_t = prices.loc[t, 'SPY US Equity']
price_t_next = prices.loc[t_next, 'SPY US Equity']
except KeyError:
continue
ret = (price_t_next / price_t) - 1
X.append([slope_pos, slope_neg])
y.append(ret)
if len(X) < regression_window: # not enough data after filtering
return None
X = np.array(X)
y = np.array(y)
# Add constant term
X = sm.add_constant(X)
model = sm.OLS(y, X).fit(cov_type='HAC', cov_kwds={'maxlags':1})
# Use yesterday's slope values for prediction
yesterday = available_dates[-1]
slope_yesterday = macro_data.loc[yesterday, 'VIX_Spread_EMA']
slope_pos_yesterday = slope_yesterday if slope_yesterday > 0 else 0
slope_neg_yesterday = slope_yesterday if slope_yesterday < 0 else 0
X_pred = np.array([1, slope_pos_yesterday, slope_neg_yesterday])
predicted_return = model.predict(X_pred)[0]
return predicted_return
def map_predicted_return_to_alloc(pred_return, macro_min_alloc=0.6):
"""
Map the predicted SPY return to a target allocation.
If the predicted return is non-negative, allocate 100% (risk-on).
If negative, linearly scale the allocation between 100% and macro_min_alloc.
For example, if pred_return is -0.20 or lower, target_alloc = macro_min_alloc.
"""
if pred_return is None or pred_return >= 0:
return 1.0
max_neg = 0.20 # maximum negative return considered
factor = (1.0 - macro_min_alloc) / max_neg
alloc = 1.0 - factor * abs(pred_return)
alloc = max(min(alloc, 1.0), macro_min_alloc)
return alloc
###############################################
# 9. Helper Functions for Cash (using a cash ticker)
###############################################
def invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker):
"""
When switching from risk-off to risk-on, sell the cash instrument (e.g. SHV)
and reinvest its proceeds into the portfolio using previous-day weights.
"""
cash_price = prices.loc[current_date, cash_ticker]
cash_available = cash_qty * cash_price
if cash_available <= 0:
return portfolio, cash_qty, f"No {cash_ticker} to reinvest."
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
mom_value = compute_portfolio_value(portfolio, prices, current_date)
new_portfolio = portfolio.copy()
for ticker in portfolio:
total_qty = (portfolio[ticker] * (mom_value + cash_available)) / mom_value if mom_value > 0 else 1/len(portfolio)
new_portfolio[ticker] = total_qty
return new_portfolio, 0.0, f"Reinvested {cash_available:,.2f} from {cash_ticker}"
def allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker):
"""
Adjusts portfolio positions based on the target allocation for cash.
When deploying cash, scales down/up existing positions proportionally to reach the target allocation.
"""
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
curr_value = compute_portfolio_value(portfolio, prices, prev_date)
cash_price = prices.loc[prev_date, cash_ticker]
cash_equiv = cash_qty * cash_price
total_aum = curr_value + cash_equiv
desired_cash = (1 - target_alloc) * total_aum
cash_price = prices.loc[current_date, cash_ticker]
cash_equiv = cash_qty * cash_price
current_cash = cash_equiv
new_portfolio = portfolio.copy()
new_cash_qty = cash_qty
note = ""
if desired_cash > current_cash:
cash_to_raise = desired_cash - current_cash
curr_value = compute_portfolio_value(portfolio, prices, current_date)
for ticker in portfolio:
price = prices.loc[current_date, ticker]
sell_ratio = (cash_to_raise / curr_value)
qty_to_sell = portfolio[ticker] * sell_ratio
new_portfolio[ticker] -= qty_to_sell
new_cash_qty += cash_to_raise / cash_price
note = f"Raised {cash_to_raise:,.2f} into {cash_ticker}"
elif desired_cash < current_cash:
current_portfolio_value = sum(
portfolio[ticker] * prices.loc[current_date, ticker]
for ticker in portfolio
)
desired_risk_allocation = total_aum * target_alloc
scaling_factor = desired_risk_allocation / current_portfolio_value
for ticker in portfolio:
new_portfolio[ticker] = portfolio[ticker] * scaling_factor
excess_cash = current_cash - desired_cash
new_cash_qty -= excess_cash / cash_price
note = f"Deployed {excess_cash:,.2f} from {cash_ticker} into portfolio"
return new_portfolio, new_cash_qty, note
###############################################
# 10. Simulation: Strategy with Cash & VIX Regression Signal
###############################################
def simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers,
initial_aum, start_date, end_date,
rebalance_period, rebalance_ratio,
lookback_period, metric_type,
internal_rebalance_ratios,
regression_window=100,
cash_ticker='SHV US Equity',
macro_min_alloc=0.6,
max_alloc_change=0.05):
"""
This simulation:
- Uses a designated cash_ticker (e.g. "SHV US Equity") for the cash component.
- Excludes the cash_ticker from momentum ranking/trading.
- When target allocation is below 100% (risk-off), excess portfolio value is sold into the cash_ticker.
- When switching back to risk-on, the cash position is reinvested.
- The regime and allocation signals are now based solely on a rolling regression of the VIX futures term structure.
- The regression is run on a rolling window (e.g. 100 days) to predict SPY returns.
- If the predicted return is negative, the strategy goes risk-off and scales the target allocation down.
"""
all_tickers = eq_tickers + fi_tickers + alts_tickers
momentum_tickers = [t for t in all_tickers if t != cash_ticker]
monthly_dates = get_observation_dates(prices, start_date, end_date, rebalance_period)
daily_dates = prices.index.sort_values()
daily_dates = daily_dates[(daily_dates >= start_date) & (daily_dates <= end_date)]
portfolio = initialize_portfolio(prices, start_date, momentum_tickers, initial_aum)
cash_qty = 0.0
current_regime = 'risk-on'
target_alloc = 1.0
prev_total_aum = initial_aum
results = []
for current_date in daily_dates:
daily_note = "No adjustment"
# Generate signal using rolling regression on VIX term structure.
pred_return = rolling_regression_signal(prices, macro_data, current_date, regression_window)
if pred_return is None:
# Not enough data; default to risk-on.
pred_return = 0.0
# Map predicted return to a target allocation.
if pred_return < 0:
current_regime = 'risk-off'
else:
current_regime = 'risk-on'
target_alloc = map_predicted_return_to_alloc(pred_return, macro_min_alloc)
# For reference, record the predicted return and target allocation.
daily_note += f" | Predicted SPY return: {pred_return:.4f}, Target Alloc: {target_alloc:.2f}"
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
mom_value = compute_portfolio_value(portfolio, prices, current_date)
# Adjust cash allocation if needed.
if current_regime == 'risk-on' and cash_qty > 0:
portfolio, cash_qty, note_update = invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker)
daily_note += " | " + note_update
elif current_regime == 'risk-off':
portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
daily_note += " | " + note_update
# Monthly rebalancing using momentum ranking.
if current_date in monthly_dates:
sorted_tickers, ranks, metrics = rank_assets(prices, current_date, momentum_tickers, lookback_period, metric_type)
temp_portfolio, trades, pre_rebalance_value = rebalance_portfolio(portfolio, prices, current_date, momentum_tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio)
temp_portfolio = adjust_overweight(temp_portfolio, prices, current_date, sorted_tickers, threshold=0.70)
temp_value = compute_portfolio_value(temp_portfolio, prices, current_date)
# Example: if the combined weight of SPY and HYG is too low when risk-off, force risk-on.
spy_temp = temp_portfolio.get('SPY US Equity', 0) * prices.loc[current_date, 'SPY US Equity']
hyg_temp = temp_portfolio.get('HYG US Equity', 0) * prices.loc[current_date, 'HYG US Equity']
combined_weight = (spy_temp + hyg_temp) / temp_value if temp_value > 0 else 0
if current_regime == 'risk-off' and combined_weight < 0.40:
current_regime = 'risk-on'
target_alloc = 1.0
daily_note += " | Monthly: Forced risk-on due to SPY+HYG weight < 40%."
total_aum = compute_portfolio_value(portfolio, prices, current_date) + cash_qty * prices.loc[current_date, cash_ticker]
simulated_value = temp_value
new_portfolio = {}
for ticker in temp_portfolio:
price = prices.loc[current_date, ticker]
simulated_weight = (temp_portfolio[ticker] * price) / simulated_value if simulated_value > 0 else 1/len(temp_portfolio)
new_qty = (total_aum * simulated_weight) / price
new_portfolio[ticker] = new_qty
portfolio = new_portfolio
cash_qty = 0
else:
portfolio = temp_portfolio
curr_value = compute_portfolio_value(portfolio, prices, current_date)
total_aum = curr_value + cash_qty * prices.loc[current_date, cash_ticker]
desired_value = target_alloc * total_aum
if curr_value > desired_value:
portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
daily_note += " | Monthly: " + note_update
elif curr_value < desired_value and cash_qty > 0:
portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
daily_note += " | Monthly: " + note_update
current_mom_value = compute_portfolio_value(portfolio, prices, current_date)
cash_price = prices.loc[current_date, cash_ticker]
cash_value = cash_qty * cash_price
total_aum = current_mom_value + cash_value
ret = (total_aum - prev_total_aum) / prev_total_aum if prev_total_aum > 0 else 0
prev_total_aum = total_aum
row = {
'Date': current_date,
'Momentum AUM': current_mom_value,
'Cash Qty': cash_qty,
'Cash Price': cash_price,
'Cash Value': cash_value,
'Total AUM': total_aum,
'Current Regime': current_regime,
'Target Alloc': target_alloc,
'Predicted SPY Return': pred_return,
'Adjustment Note': daily_note,
'Return': ret,
'Event': 'Monthly Rebalance' if current_date in monthly_dates else 'Daily Check'
}
for ticker in momentum_tickers:
price = prices.loc[current_date, ticker]
qty = portfolio[ticker]
notional = qty * price
row[f'qty_{ticker}'] = qty
row[f'notional_{ticker}'] = notional
row[f'weight_{ticker}'] = (notional / current_mom_value) if current_mom_value > 0 else np.nan
if current_date in monthly_dates:
row[f'rank_{ticker}'] = ranks.get(ticker, np.nan)
row[f'metric_{ticker}'] = metrics.get(ticker, np.nan)
row[f'trade_{ticker}'] = trades.get(ticker, 0)
else:
row[f'rank_{ticker}'] = np.nan
row[f'metric_{ticker}'] = np.nan
row[f'trade_{ticker}'] = 0
results.append(row)
result_df = pd.DataFrame(results)
result_df.set_index('Date', inplace=True)
return result_df
###############################################
# 11. Main – Example Usage
###############################################
if __name__ == '__main__':
eq_tickers = ['SPY US Equity']
fi_tickers = ['TLT US Equity', 'HYG US Equity']
alts_tickers = ['GLD US Equity', 'IGSB US Equity']
initial_aum = 100e6
start_date = pd.to_datetime('2008-01-01')
end_date = pd.to_datetime('2025-02-01')
rebalance_period = 1
rebalance_ratio = 0.2
lookback_period = 6
metric_type = 'simple'
internal_rebalance_ratios = [0.8, 0.2, 0, -0.2, -0.8]
price_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Codes\Historic Prices.xlsx"
macro_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Momentum Strategy Overlay Data.xlsx"
prices = load_price_data(price_filepath)
macro_data = load_macro_data(macro_filepath)
result_df = simulate_strategy(prices, macro_data,
eq_tickers, fi_tickers, alts_tickers,
initial_aum, start_date, end_date,
rebalance_period, rebalance_ratio,
lookback_period, metric_type,
internal_rebalance_ratios,
regression_window=100,
cash_ticker='SHV US Equity',
macro_min_alloc=0.6,
max_alloc_change=0.05)
pd.set_option('display.float_format', lambda x: f'{x:,.2f}')
# For example, to display the last few rows:
# print(result_df[['Total AUM', 'Momentum AUM', 'Cash Price']].tail())
Editor is loading...
Leave a Comment