Untitled
unknown
plain_text
7 months ago
25 kB
4
Indexable
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil.relativedelta import relativedelta
from scipy.stats import zscore
###############################################
# 1. Data Loading (Filtering Out Weekends)
###############################################
def load_price_data(filepath):
"""
Load historical prices from an Excel file.
Assumes that the first column is dates and the remaining columns are tickers.
Removes weekend data.
"""
df = pd.read_excel(filepath, sheet_name="Sheet2", index_col=0)
df.index = pd.to_datetime(df.index)
df = df.sort_index()
# Filter out weekends (Saturday=5, Sunday=6)
df = df[df.index.dayofweek < 5]
return df
def load_macro_data(filepath):
"""
Load macro indicators from an Excel file.
Loads VIX and VIX3M from 'Eq2' sheet, LF98TRUU from 'FI2' sheet.
Removes weekend data.
Also computes slopes for selected macro indicators.
"""
# VIX data
vix_data = pd.read_excel(filepath, sheet_name='Eq2', index_col=0, parse_dates=True, usecols=[0, 4, 5, 6, 7, 8])
vix_data.columns = ['VIX', 'VIX3M', 'UX1', 'UX2', 'UX3']
vix_data = vix_data[vix_data.index.dayofweek < 5]
cdx_data = pd.read_excel(filepath, sheet_name='FI2', index_col=0, parse_dates=True, usecols=[0, 2])
cdx_data.columns = ['LF98TRUU']
cdx_data = cdx_data[cdx_data.index.dayofweek < 5]
combined_data = pd.concat([vix_data, cdx_data], axis=1)
combined_data = combined_data.fillna(method='ffill').fillna(method='bfill')
combined_data = combined_data.sort_index()
return combined_data
###############################################
# 2. Helper: Observation Dates (Monthly)
###############################################
def get_observation_dates(prices, start_date, end_date, rebalance_period):
dates = []
current_date = start_date
while current_date < end_date:
candidate_date = (current_date + relativedelta(months=rebalance_period)).replace(day=1)
while candidate_date not in prices.index:
candidate_date += pd.Timedelta(days=1)
if candidate_date.month != (current_date + relativedelta(months=rebalance_period)).month:
candidate_date = None
break
if candidate_date is None or candidate_date > end_date:
break
dates.append(candidate_date)
current_date = candidate_date
return dates
###############################################
# 3. Portfolio Initialization
###############################################
def initialize_portfolio(prices, date, tickers, initial_aum):
portfolio = {}
allocation = initial_aum / len(tickers)
for ticker in tickers:
price = prices.loc[date, ticker]
portfolio[ticker] = allocation / price
return portfolio
###############################################
# 4. Lookback Metric Computation
###############################################
def compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type='simple'):
prices = prices.sort_index()
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
lookback_date = prev_date - relativedelta(months=lookback_period)
current_price = prices[ticker].asof(prev_date)
lookback_price = prices[ticker].asof(lookback_date)
if pd.isna(current_price) or pd.isna(lookback_price):
raise ValueError(f"Missing price data for {ticker} on {prev_date} or {lookback_date}.")
if metric_type == 'simple':
metric = (current_price / lookback_price) - 1
elif metric_type == 'sma':
window = prices[ticker].loc[lookback_date:current_date]
if window.empty:
raise ValueError(f"No price data for {ticker} between {lookback_date} and {current_date}.")
sma = window.mean()
metric = (current_price - sma) / sma
else:
raise ValueError("Invalid metric type. Choose 'simple' or 'sma'.")
return metric
###############################################
# 5. Ranking Assets by Momentum
###############################################
def rank_assets(prices, current_date, tickers, lookback_period, metric_type):
metrics = {}
for ticker in tickers:
metric = compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type)
metrics[ticker] = metric
sorted_tickers = sorted(metrics, key=metrics.get, reverse=True)
ranks = {ticker: rank+1 for rank, ticker in enumerate(sorted_tickers)}
return sorted_tickers, ranks, metrics
###############################################
# 6. Compute Current Portfolio Value
###############################################
def compute_portfolio_value(portfolio, prices, current_date):
value = 0
for ticker, quantity in portfolio.items():
price = prices.loc[current_date, ticker]
value += quantity * price
return value
###############################################
# 7. Rebalance the Momentum Portfolio
###############################################
def rebalance_portfolio(portfolio, prices, current_date, tickers, sorted_tickers,
internal_rebalance_ratios, rebalance_ratio):
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
prev_prices = prices.loc[prev_date]
curr_prices = prices.loc[current_date]
portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in tickers)
rebalance_amount = portfolio_value * rebalance_ratio
target_trades = {ticker: rebalance_amount * internal_rebalance_ratios[i]
for i, ticker in enumerate(sorted_tickers)}
total_sold = 0
actual_trades = {}
for ticker, target_trade in target_trades.items():
if target_trade < 0:
available_notional = portfolio[ticker] * curr_prices[ticker]
sell_target = abs(target_trade)
actual_sell = min(available_notional, sell_target)
actual_trades[ticker] = -actual_sell
total_sold += actual_sell
else:
actual_trades[ticker] = 0
total_buy_target = sum(t for t in target_trades.values() if t > 0)
if total_buy_target > 0:
for ticker, target_trade in target_trades.items():
if target_trade > 0:
proportion = target_trade / total_buy_target
buy_amount = total_sold * proportion
actual_trades[ticker] = buy_amount
new_portfolio = portfolio.copy()
for ticker, trade_notional in actual_trades.items():
execution_price = curr_prices[ticker]
qty_change = trade_notional / execution_price
new_portfolio[ticker] += qty_change
return new_portfolio, actual_trades, portfolio_value
def adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70):
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
prev_prices = prices.loc[prev_date]
curr_prices = prices.loc[current_date]
portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in portfolio)
weights = {ticker: (portfolio[ticker] * prev_prices[ticker]) / portfolio_value
for ticker in portfolio}
new_portfolio = portfolio.copy()
for overweight in portfolio:
if weights[overweight] > threshold:
extra_weight = weights[overweight] - threshold
extra_value = extra_weight * portfolio_value
execution_price_over = curr_prices[overweight]
qty_reduce = extra_value / execution_price_over
new_portfolio[overweight] -= qty_reduce
remaining_value = extra_value
for candidate in sorted_tickers:
if candidate == overweight:
continue
candidate_value = new_portfolio[candidate] * curr_prices[candidate]
candidate_weight = candidate_value / portfolio_value
if candidate_weight < threshold:
capacity = (threshold - candidate_weight) * portfolio_value
allocation = min(remaining_value, capacity)
qty_add = allocation / curr_prices[candidate]
new_portfolio[candidate] += qty_add
remaining_value -= allocation
if remaining_value <= 0:
break
return new_portfolio
###############################################
# 8. Risk Signal Function
###############################################
def generate_risk_signals(current_date, macro_data, prices, portfolio):
# Get the most recent macro data before current_date
available_macro_dates = macro_data.index[macro_data.index < current_date]
ref_date = available_macro_dates[-1] if len(available_macro_dates) > 0 else current_date
# Retrieve key metrics (simplified for this example)
ux1_ux2_spread = macro_data['UX1'].asof(ref_date) - macro_data['UX2'].asof(ref_date)
slope_5d = macro_data['UX1'].asof(ref_date) # placeholder
slope_10d = macro_data['UX1'].asof(ref_date) # placeholder
slope_15d = macro_data['UX1'].asof(ref_date) # placeholder
vix_mom_signal = macro_data['UX3'].asof(ref_date) # placeholder
# Determine signal based on momentum signal
vix_signal = 'risk-off' if vix_mom_signal > 0 else 'risk-on'
# Set target allocation based on signal (example logic)
if vix_mom_signal == 3:
vix_target_alloc = 0.6
elif vix_mom_signal == 1:
vix_target_alloc = 0.8
else:
vix_target_alloc = 1.0
# Get weights for SPY and HYG using previous day's portfolio
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
mom_value = compute_portfolio_value(portfolio, prices, prev_date)
spy_weight = (portfolio.get('SPY US Equity', 0) * prices.loc[prev_date, 'SPY US Equity']) / mom_value if mom_value > 0 else 0
hyg_weight = (portfolio.get('HYG US Equity', 0) * prices.loc[prev_date, 'HYG US Equity']) / mom_value if mom_value > 0 else 0
note = ""
# Override logic based on SPY+HYG weights
if vix_signal == 'risk-off':
if (spy_weight + hyg_weight) < 0.40:
regime = 'risk-on'
target_alloc = 1
note = "Forced regime to risk-on & target alloc 100% due to SPY+HYG < 40%"
else:
regime = 'risk-off'
target_alloc = vix_target_alloc
else:
regime = 'risk-on'
target_alloc = 1.0
vix_params = {
'UX1_UX2_Spread': ux1_ux2_spread,
'slope_5d': slope_5d,
'slope_10d': slope_10d,
'slope_15d': slope_15d,
'vix_mom_signal': vix_mom_signal,
'vix_target_alloc': vix_target_alloc,
'spy_weight': spy_weight,
'hyg_weight': hyg_weight
}
return regime, target_alloc, vix_signal, note, vix_params
###############################################
# 9. Helper Functions for Cash
###############################################
def invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker):
cash_price = prices.loc[current_date, cash_ticker]
cash_available = cash_qty * cash_price
if cash_available <= 0:
return portfolio, cash_qty, f"No {cash_ticker} to reinvest."
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
mom_value = compute_portfolio_value(portfolio, prices, current_date)
new_portfolio = portfolio.copy()
for ticker in portfolio:
prev_price = prices.loc[prev_date, ticker]
total_qty = (portfolio[ticker] * (mom_value + cash_available)) / mom_value if mom_value > 0 else 1/len(portfolio)
new_portfolio[ticker] = total_qty
return new_portfolio, 0.0, f"Reinvested {cash_available:,.2f} from {cash_ticker}"
def allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker):
# Use previous day's snapshot to compute total AUM
mask = prices.index < current_date
prev_date = prices.index[mask][-1]
curr_value = compute_portfolio_value(portfolio, prices, prev_date)
cash_price_prev = prices.loc[prev_date, cash_ticker]
cash_equiv_prev = cash_qty * cash_price_prev
total_aum = curr_value + cash_equiv_prev
# Determine desired cash based on target allocation
desired_cash = (1 - target_alloc) * total_aum
cash_price = prices.loc[current_date, cash_ticker]
current_cash = cash_qty * cash_price
new_portfolio = portfolio.copy()
new_cash_qty = cash_qty
note = ""
# If excess cash exists, scale up positions to reach target allocation
if desired_cash < current_cash:
current_portfolio_value = sum(portfolio[ticker] * prices.loc[current_date, ticker] for ticker in portfolio)
desired_risk_allocation = total_aum * target_alloc
scaling_factor = desired_risk_allocation / current_portfolio_value
for ticker in portfolio:
new_portfolio[ticker] = portfolio[ticker] * scaling_factor
excess_cash = current_cash - desired_cash
new_cash_qty -= excess_cash / cash_price
note = f"Deployed {excess_cash:,.2f} from {cash_ticker} into portfolio"
# If cash is insufficient, sell some positions to raise cash
elif desired_cash > current_cash:
cash_to_raise = desired_cash - current_cash
curr_value = compute_portfolio_value(portfolio, prices, current_date)
for ticker in portfolio:
price = prices.loc[current_date, ticker]
sell_ratio = (cash_to_raise / curr_value)
qty_to_sell = portfolio[ticker] * sell_ratio
new_portfolio[ticker] -= qty_to_sell
new_cash_qty += cash_to_raise / cash_price
note = f"Raised {cash_to_raise:,.2f} into {cash_ticker}"
return new_portfolio, new_cash_qty, note
###############################################
# 10. Simulation: Strategy
###############################################
def simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers,
initial_aum, start_date, end_date,
rebalance_period, rebalance_ratio,
lookback_period, metric_type,
internal_rebalance_ratios,
cash_ticker='SHV US Equity'):
all_tickers = eq_tickers + fi_tickers + alts_tickers
momentum_tickers = [t for t in all_tickers if t != cash_ticker]
monthly_dates = get_observation_dates(prices, start_date, end_date, rebalance_period)
daily_dates = prices.index.sort_values()
daily_dates = daily_dates[(daily_dates >= start_date) & (daily_dates <= end_date)]
macro_data = macro_data.copy()
macro_data['UX1_UX2_Spread'] = macro_data['UX1'] - macro_data['UX2']
macro_data['UX1_UX2_SpreadEMA_12'] = macro_data["UX1_UX2_Spread"].ewm(span=12, adjust=False).mean()
macro_data['Slope5D'] = macro_data["UX1_UX2_SpreadEMA_12"].diff(5)
macro_data['Slope10D'] = macro_data["UX1_UX2_SpreadEMA_12"].diff(10)
macro_data['Slope15D'] = macro_data["UX1_UX2_SpreadEMA_12"].diff(15)
macro_data['LF98TRUU_EMA_12'] = macro_data["LF98TRUU"].ewm(span=12, adjust=False).mean()
macro_data['Slope5D_HY'] = macro_data["LF98TRUU_EMA_12"].diff(5)
macro_data['Slope10D_HY'] = macro_data["LF98TRUU_EMA_12"].diff(10)
macro_data['Slope15D_HY'] = macro_data["LF98TRUU_EMA_12"].diff(15)
macro_data["Signal_Momentum"] = (
(macro_data['Slope5D'] > 0).astype(int) * 2 - 1 +
(macro_data['Slope10D'] > 0).astype(int) * 2 - 1 +
(macro_data['Slope15D'] > 0).astype(int) * 2 - 1
)
macro_data["Signal_Momentum_HY"] = (
(macro_data['Slope5D_HY'] > 0).astype(int) * 2 - 1 +
(macro_data['Slope10D_HY'] > 0).astype(int) * 2 - 1 +
(macro_data['Slope15D_HY'] > 0).astype(int) * 2 - 1
)
available_dates = prices.index[prices.index >= start_date]
if len(available_dates) == 0:
raise ValueError("No trading dates found after the specified start_date")
start_date = available_dates[0]
portfolio = initialize_portfolio(prices, start_date, momentum_tickers, initial_aum)
cash_qty = 0.0
current_regime, target_alloc = 'risk-on', 1.0
previous_regime, previous_target_alloc = current_regime, target_alloc
prev_total_aum = initial_aum
results = []
for current_date in daily_dates:
# Save snapshot of yesterday's portfolio and cash
prev_portfolio = portfolio.copy()
prev_cash_qty = cash_qty
daily_note = "No adjustment"
vix_params = {}
ranks, metrics, trades = {}, {}, {}
# --- Generate regime & allocation ---
regime, target_alloc, vix_signal, signal_note, vix_params = generate_risk_signals(
current_date, macro_data, prices, portfolio)
current_regime = regime
daily_note = signal_note
# --- Monthly Rebalancing Logic ---
if current_date in monthly_dates:
sorted_tickers, ranks, metrics = rank_assets(prices, current_date, momentum_tickers, lookback_period, metric_type)
temp_portfolio, trades, pre_rebalance_value = rebalance_portfolio(
portfolio, prices, current_date, momentum_tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio)
temp_portfolio = adjust_overweight(temp_portfolio, prices, current_date, sorted_tickers, threshold=0.70)
temp_value = compute_portfolio_value(temp_portfolio, prices, current_date)
spy_temp = temp_portfolio.get('SPY US Equity', 0) * prices.loc[current_date, 'SPY US Equity']
hyg_temp = temp_portfolio.get('HYG US Equity', 0) * prices.loc[current_date, 'HYG US Equity']
combined_weight = (spy_temp + hyg_temp) / temp_value if temp_value > 0 else 0
if (current_regime == 'risk-off') and (combined_weight < 0.40):
current_regime = 'risk-on'
target_alloc = 1
daily_note += " | Monthly: Forced risk-on due to SPY+HYG weight < 40% after simulation."
portfolio = temp_portfolio
if target_alloc != previous_target_alloc:
total_aum_temp = compute_portfolio_value(portfolio, prices, current_date) + cash_qty * prices.loc[current_date, cash_ticker]
desired_value = target_alloc * total_aum_temp
actual_value = compute_portfolio_value(portfolio, prices, current_date)
if abs(actual_value - desired_value) / total_aum_temp > 0.001:
portfolio, cash_qty, note_update = allocate_cash_from_portfolio(
portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
daily_note += " | Monthly: " + note_update
else:
daily_note += " | Monthly: Skipped cash adjustment due to minor deviation."
else:
daily_note += " | Monthly"
# --- Daily Regime Handling: Only if NOT monthly date ---
elif (previous_regime != current_regime) or (current_regime == 'risk-off' and target_alloc != previous_target_alloc):
if previous_regime == 'risk-off' and current_regime == 'risk-on' and cash_qty > 0:
portfolio, cash_qty, note_update = invest_cash_into_portfolio(
portfolio, prices, current_date, cash_qty, cash_ticker)
daily_note += " | " + note_update
elif (previous_regime == 'risk-on' and current_regime == 'risk-off') or \
(current_regime == 'risk-off' and target_alloc != previous_target_alloc):
portfolio, cash_qty, note_update = allocate_cash_from_portfolio(
portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
daily_note += " | " + note_update
previous_regime = current_regime
previous_target_alloc = target_alloc
# --- Log AUM and daily stats using yesterday's snapshot (prev_portfolio, prev_cash_qty) ---
cash_price = prices.loc[current_date, cash_ticker]
cash_value = prev_cash_qty * cash_price
momentum_value = sum(
prev_portfolio[ticker] * prices.loc[current_date, ticker]
for ticker in prev_portfolio
)
total_aum = momentum_value + cash_value
ret = (total_aum - prev_total_aum) / prev_total_aum if prev_total_aum > 0 else 0
prev_total_aum = total_aum
row = {
'Total AUM': total_aum,
'Momentum AUM': momentum_value,
'Cash Qty': cash_qty,
'Cash Price': cash_price,
'Cash Value': cash_value,
'Current Regime': current_regime,
'Target Alloc': target_alloc,
'VIX Target': vix_params.get('vix_target_alloc', np.nan),
'VIX Signal': vix_signal,
'Adjustment Note': daily_note,
'Cash Adjustment': 0.0,
'Return': ret,
'Event': 'Monthly Rebalance' if current_date in monthly_dates else 'Daily Check',
'Slope_5D': vix_params.get('slope_5d', np.nan),
'Slope_10D': vix_params.get('slope_10d', np.nan),
'Slope_15D': vix_params.get('slope_15d', np.nan),
'VIX_Mom_Signal': vix_params.get('vix_mom_signal', np.nan),
'Date': current_date
}
for ticker in momentum_tickers:
qty = prev_portfolio.get(ticker, 0)
price = prices.loc[current_date, ticker]
notional = qty * price
weight = notional / momentum_value if momentum_value > 0 else np.nan
row[f'qty_{ticker}'] = qty
row[f'price_{ticker}'] = price
row[f'notional_{ticker}'] = notional
row[f'weight_{ticker}'] = weight
row[f'rank_{ticker}'] = ranks.get(ticker, np.nan)
row[f'metric_{ticker}'] = metrics.get(ticker, np.nan)
row[f'trade_{ticker}'] = trades.get(ticker, 0)
results.append(row)
result_df = pd.DataFrame(results)
result_df.set_index('Date', inplace=True)
return result_df
###############################################
# 11. Restructure Results for Output
###############################################
def restructure_results(result_df, momentum_tickers):
base_columns = ['Total AUM', 'Momentum AUM', 'Cash Value', 'Cash Qty', 'Cash Price', 'Return',
'Current Regime', 'Target Alloc', 'VIX Target', 'VIX Signal',
'Adjustment Note', 'Event']
vix_columns = ['Slope_5D', 'Slope_10D', 'Slope_15D', 'VIX_Mom_Signal']
price_columns = [f'price_{ticker}' for ticker in momentum_tickers]
qty_columns = [f'qty_{ticker}' for ticker in momentum_tickers]
notional_columns = [f'notional_{ticker}' for ticker in momentum_tickers]
weight_columns = [f'weight_{ticker}' for ticker in momentum_tickers]
rank_columns = [f'rank_{ticker}' for ticker in momentum_tickers]
metric_columns = [f'metric_{ticker}' for ticker in momentum_tickers]
trade_columns = [f'trade_{ticker}' for ticker in momentum_tickers]
new_column_order = (
base_columns +
price_columns +
qty_columns +
notional_columns +
weight_columns +
rank_columns +
metric_columns +
trade_columns +
vix_columns
)
existing_columns = [col for col in new_column_order if col in result_df.columns]
return result_df[existing_columns]
###############################################
# 12. Main – Example Usage
###############################################
if __name__ == '__main__':
eq_tickers = ['SPY US Equity']
fi_tickers = ['TLT US Equity', 'HYG US Equity']
alts_tickers = ['GLD US Equity', 'IGSB US Equity']
initial_aum = 100e6
start_date = pd.to_datetime('2008-01-01')
end_date = pd.to_datetime('2025-02-01')
rebalance_period = 1
rebalance_ratio = 0.2
lookback_period = 6
metric_type = 'simple'
internal_rebalance_ratios = [0.8, 0.2, 0, -0.2, -0.8]
price_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Codes\Historic Prices.xlsx"
macro_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Momentum Strategy Overlay Data.xlsx"
prices = load_price_data(price_filepath)
macro_data = load_macro_data(macro_filepath)
result_df = simulate_strategy(prices, macro_data,
eq_tickers, fi_tickers, alts_tickers,
initial_aum, start_date, end_date,
rebalance_period, rebalance_ratio,
lookback_period, metric_type,
internal_rebalance_ratios,
cash_ticker='SHV US Equity')
pd.set_option('display.float_format', lambda x: f'{x:,.2f}')
all_tickers = eq_tickers + fi_tickers + alts_tickers
momentum_tickers = [t for t in all_tickers]
result_df = restructure_results(result_df, momentum_tickers)
# For example, print a summary of the simulation:
print(result_df[['Total AUM', 'Momentum AUM', 'Cash Value']].tail())
Editor is loading...
Leave a Comment