Untitled

 avatar
unknown
plain_text
a month ago
25 kB
4
Indexable
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil.relativedelta import relativedelta
import statsmodels.api as sm
from scipy.stats import zscore

###############################################
# 1. Data Loading (Filtering Out Weekends)
###############################################

def load_price_data(filepath):
    """
    Load historical prices from an Excel file.
    Assumes that the first column is dates and the remaining columns are tickers.
    Removes weekend data.
    """
    df = pd.read_excel(filepath, index_col=0)
    df.index = pd.to_datetime(df.index)
    df = df.sort_index()
    # Filter out weekends (Saturday=5, Sunday=6)
    df = df[df.index.dayofweek < 5]
    return df

def load_macro_data(filepath):
    """
    Load macro indicators from an Excel file.
    Loads VIX and VIX3M from the 'Eq' sheet and other macro indicators from the 'Macro' sheet.
    FI signals are dropped (so no LF98TRUU is loaded) but FI tickers can still be used later.
    Removes weekend data and computes slopes for selected macro indicators.
    Also computes the VIX term structure and its exponential moving average.
    """
    # VIX data
    vix_data = pd.read_excel(filepath, sheet_name='Eq', index_col=0, parse_dates=True, usecols=[0, 4, 5])
    vix_data.columns = ['VIX', 'VIX3M']
    vix_data = vix_data[vix_data.index.dayofweek < 5]
    
    # Macro data (assumed to include columns "CESIUSD Index", "INJCJC Index", ".HG/GC G Index", "Consumer Confidence")
    macro_data = pd.read_excel(filepath, sheet_name='Macro', index_col=0, parse_dates=True, usecols=range(8), skiprows=1)
    macro_data = macro_data[macro_data.index.dayofweek < 5]
    
    # Compute slopes for selected macro indicators.
    macro_data["Surprise Index Slope"] = macro_data["CESIUSD Index"].diff()
    macro_data["Jobless Claims Slope"] = macro_data["INJCJC Index"].diff()
    macro_data["Copper Gold Slope"] = macro_data['.HG/GC G Index'].diff()
    
    # Combine VIX data and macro data
    combined_data = pd.concat([vix_data, macro_data], axis=1)
    combined_data = combined_data.fillna(method='ffill').fillna(method='bfill')
    combined_data = combined_data.sort_index()
    
    # Compute VIX term structure and its EMA
    combined_data['VIX_Spread'] = combined_data['VIX'] - combined_data['VIX3M']
    combined_data['VIX_Spread_EMA'] = combined_data['VIX_Spread'].ewm(span=5, adjust=False).mean()
    
    return combined_data

###############################################
# 2. Helper: Observation Dates (Monthly)
###############################################

def get_observation_dates(prices, start_date, end_date, rebalance_period):
    dates = []
    current_date = start_date
    while current_date < end_date:
        candidate_date = (current_date + relativedelta(months=rebalance_period)).replace(day=1)
        while candidate_date not in prices.index:
            candidate_date += pd.Timedelta(days=1)
            if candidate_date.month != (current_date + relativedelta(months=rebalance_period)).month:
                candidate_date = None
                break
        if candidate_date is None or candidate_date > end_date:
            break
        dates.append(candidate_date)
        current_date = candidate_date
    return dates

###############################################
# 3. Portfolio Initialization
###############################################

def initialize_portfolio(prices, date, tickers, initial_aum):
    portfolio = {}
    mask = prices.index < date
    prev_date = prices.index[mask][-1]
    allocation = initial_aum / len(tickers)
    for ticker in tickers:
        price = prices.loc[prev_date, ticker]
        portfolio[ticker] = allocation / price
    return portfolio

###############################################
# 4. Lookback Metric Computation
###############################################

def compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type='simple'):
    prices = prices.sort_index()
    mask = prices.index < current_date
    prev_date = prices.index[mask][-1]
    lookback_date = prev_date - relativedelta(months=lookback_period)
    
    current_price = prices[ticker].asof(prev_date)
    lookback_price = prices[ticker].asof(lookback_date)

    if pd.isna(current_price) or pd.isna(lookback_price):
        raise ValueError(f"Missing price data for {ticker} on {prev_date} or {lookback_date}.")
    if metric_type == 'simple':
        metric = (current_price / lookback_price) - 1
    elif metric_type == 'sma':
        window = prices[ticker].loc[lookback_date:current_date]
        if window.empty:
            raise ValueError(f"No price data for {ticker} between {lookback_date} and {current_date}.")
        sma = window.mean()
        metric = (current_price - sma) / sma
    else:
        raise ValueError("Invalid metric type. Choose 'simple' or 'sma'.")
    return metric

###############################################
# 5. Ranking Assets by Momentum
###############################################

def rank_assets(prices, current_date, tickers, lookback_period, metric_type):
    metrics = {}
    for ticker in tickers:
        metric = compute_lookback_metric(prices, current_date, ticker, lookback_period, metric_type)
        metrics[ticker] = metric
    sorted_tickers = sorted(metrics, key=metrics.get, reverse=True)
    ranks = {ticker: rank+1 for rank, ticker in enumerate(sorted_tickers)}
    return sorted_tickers, ranks, metrics

###############################################
# 6. Compute Current Portfolio Value
###############################################

def compute_portfolio_value(portfolio, prices, current_date):
    value = 0
    for ticker, quantity in portfolio.items():
        price = prices.loc[current_date, ticker]
        value += quantity * price
    return value

###############################################
# 7. Rebalance the Momentum Portfolio
###############################################

def rebalance_portfolio(portfolio, prices, current_date, tickers, sorted_tickers,
                        internal_rebalance_ratios, rebalance_ratio):
    mask = prices.index < current_date
    prev_date = prices.index[mask][-1]
    prev_prices = prices.loc[prev_date]
    curr_prices = prices.loc[current_date]

    portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in tickers)
    rebalance_amount = portfolio_value * rebalance_ratio

    target_trades = {ticker: rebalance_amount * internal_rebalance_ratios[i]
                     for i, ticker in enumerate(sorted_tickers)}

    total_sold = 0
    actual_trades = {}

    for ticker, target_trade in target_trades.items():
        if target_trade < 0:
            available_notional = portfolio[ticker] * curr_prices[ticker]
            sell_target = abs(target_trade)
            actual_sell = min(available_notional, sell_target)
            actual_trades[ticker] = -actual_sell
            total_sold += actual_sell
        else:
            actual_trades[ticker] = 0

    total_buy_target = sum(t for t in target_trades.values() if t > 0)
    if total_buy_target > 0:
        for ticker, target_trade in target_trades.items():
            if target_trade > 0:
                proportion = target_trade / total_buy_target
                buy_amount = total_sold * proportion
                actual_trades[ticker] = buy_amount

    new_portfolio = portfolio.copy()
    for ticker, trade_notional in actual_trades.items():
        execution_price = curr_prices[ticker]
        qty_change = trade_notional / execution_price
        new_portfolio[ticker] += qty_change

    return new_portfolio, actual_trades, portfolio_value

def adjust_overweight(portfolio, prices, current_date, sorted_tickers, threshold=0.70):
    mask = prices.index < current_date
    prev_date = prices.index[mask][-1]
    prev_prices = prices.loc[prev_date]
    curr_prices = prices.loc[current_date]

    portfolio_value = sum(portfolio[ticker] * prev_prices[ticker] for ticker in portfolio)
    weights = {ticker: (portfolio[ticker] * prev_prices[ticker]) / portfolio_value
               for ticker in portfolio}

    new_portfolio = portfolio.copy()

    for overweight in portfolio:
        if weights[overweight] > threshold:
            extra_weight = weights[overweight] - threshold
            extra_value = extra_weight * portfolio_value
            execution_price_over = curr_prices[overweight]
            qty_reduce = extra_value / execution_price_over
            new_portfolio[overweight] -= qty_reduce

            remaining_value = extra_value

            for candidate in sorted_tickers:
                if candidate == overweight:
                    continue
                candidate_value = new_portfolio[candidate] * curr_prices[candidate]
                candidate_weight = candidate_value / portfolio_value
                if candidate_weight < threshold:
                    capacity = (threshold - candidate_weight) * portfolio_value
                    allocation = min(remaining_value, capacity)
                    qty_add = allocation / curr_prices[candidate]
                    new_portfolio[candidate] += qty_add
                    remaining_value -= allocation
                    if remaining_value <= 0:
                        break
    return new_portfolio

###############################################
# 8. Rolling OLS Regression for VIX Signal
###############################################

def rolling_regression_signal(prices, macro_data, current_date, regression_window=100):
    """
    Run a rolling OLS regression on the past regression_window days to predict SPY returns based on the VIX futures term structure.
    The regression specification is:
       R_{t+1} = α + β * (Slope_positive)_t + γ * (Slope_negative)_t + ε_t
    where:
       Slope_positive = VIX_Spread_EMA if positive, else 0
       Slope_negative = VIX_Spread_EMA if negative, else 0
    Returns the predicted return for the next period using yesterday's slope values.
    """
    # Select available dates up to current_date (need at least regression_window+1 data points)
    available_dates = macro_data.index[macro_data.index < current_date]
    if len(available_dates) < regression_window + 1:
        return None  # Not enough data to run regression
    
    # Use the most recent regression_window+1 dates
    sample_dates = available_dates[-(regression_window+1):]
    
    X = []
    y = []
    for i in range(len(sample_dates) - 1):
        t = sample_dates[i]
        t_next = sample_dates[i+1]
        slope = macro_data.loc[t, 'VIX_Spread_EMA']
        slope_pos = slope if slope > 0 else 0
        slope_neg = slope if slope < 0 else 0
        # Get SPY return from t to t_next:
        try:
            price_t = prices.loc[t, 'SPY US Equity']
            price_t_next = prices.loc[t_next, 'SPY US Equity']
        except KeyError:
            continue
        ret = (price_t_next / price_t) - 1
        X.append([slope_pos, slope_neg])
        y.append(ret)
    
    if len(X) < regression_window:  # not enough data after filtering
        return None
    
    X = np.array(X)
    y = np.array(y)
    # Add constant term
    X = sm.add_constant(X)
    model = sm.OLS(y, X).fit(cov_type='HAC', cov_kwds={'maxlags':1})
    
    # Use yesterday's slope values for prediction
    yesterday = available_dates[-1]
    slope_yesterday = macro_data.loc[yesterday, 'VIX_Spread_EMA']
    slope_pos_yesterday = slope_yesterday if slope_yesterday > 0 else 0
    slope_neg_yesterday = slope_yesterday if slope_yesterday < 0 else 0
    X_pred = np.array([1, slope_pos_yesterday, slope_neg_yesterday])
    predicted_return = model.predict(X_pred)[0]
    return predicted_return

def map_predicted_return_to_alloc(pred_return, macro_min_alloc=0.6):
    """
    Map the predicted SPY return to a target allocation.
    If the predicted return is non-negative, allocate 100% (risk-on).
    If negative, linearly scale the allocation between 100% and macro_min_alloc.
    For example, if pred_return is -0.20 or lower, target_alloc = macro_min_alloc.
    """
    if pred_return is None or pred_return >= 0:
        return 1.0
    max_neg = 0.20  # maximum negative return considered
    factor = (1.0 - macro_min_alloc) / max_neg
    alloc = 1.0 - factor * abs(pred_return)
    alloc = max(min(alloc, 1.0), macro_min_alloc)
    return alloc

###############################################
# 9. Helper Functions for Cash (using a cash ticker)
###############################################

def invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker):
    """
    When switching from risk-off to risk-on, sell the cash instrument (e.g. SHV)
    and reinvest its proceeds into the portfolio using previous-day weights.
    """
    cash_price = prices.loc[current_date, cash_ticker]
    cash_available = cash_qty * cash_price
    if cash_available <= 0:
        return portfolio, cash_qty, f"No {cash_ticker} to reinvest."

    mask = prices.index < current_date
    prev_date = prices.index[mask][-1]
    mom_value = compute_portfolio_value(portfolio, prices, current_date)
    new_portfolio = portfolio.copy()
    for ticker in portfolio:
        total_qty = (portfolio[ticker] * (mom_value + cash_available)) / mom_value if mom_value > 0 else 1/len(portfolio)
        new_portfolio[ticker] = total_qty
    return new_portfolio, 0.0, f"Reinvested {cash_available:,.2f} from {cash_ticker}"

def allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker):
    """
    Adjusts portfolio positions based on the target allocation for cash.
    When deploying cash, scales down/up existing positions proportionally to reach the target allocation.
    """
    mask = prices.index < current_date
    prev_date = prices.index[mask][-1]
    curr_value = compute_portfolio_value(portfolio, prices, prev_date)
    
    cash_price = prices.loc[prev_date, cash_ticker]
    cash_equiv = cash_qty * cash_price
    total_aum = curr_value + cash_equiv
    
    desired_cash = (1 - target_alloc) * total_aum
    cash_price = prices.loc[current_date, cash_ticker]
    cash_equiv = cash_qty * cash_price
    current_cash = cash_equiv
    
    new_portfolio = portfolio.copy()
    new_cash_qty = cash_qty
    note = ""
    
    if desired_cash > current_cash:
        cash_to_raise = desired_cash - current_cash
        curr_value = compute_portfolio_value(portfolio, prices, current_date)
        for ticker in portfolio:
            price = prices.loc[current_date, ticker]
            sell_ratio = (cash_to_raise / curr_value)
            qty_to_sell = portfolio[ticker] * sell_ratio
            new_portfolio[ticker] -= qty_to_sell
        new_cash_qty += cash_to_raise / cash_price
        note = f"Raised {cash_to_raise:,.2f} into {cash_ticker}"
    
    elif desired_cash < current_cash:
        current_portfolio_value = sum(
            portfolio[ticker] * prices.loc[current_date, ticker] 
            for ticker in portfolio
        )
        desired_risk_allocation = total_aum * target_alloc
        scaling_factor = desired_risk_allocation / current_portfolio_value
        for ticker in portfolio:
            new_portfolio[ticker] = portfolio[ticker] * scaling_factor
        excess_cash = current_cash - desired_cash
        new_cash_qty -= excess_cash / cash_price
        note = f"Deployed {excess_cash:,.2f} from {cash_ticker} into portfolio"
    
    return new_portfolio, new_cash_qty, note

###############################################
# 10. Simulation: Strategy with Cash & VIX Regression Signal
###############################################

def simulate_strategy(prices, macro_data, eq_tickers, fi_tickers, alts_tickers,
                      initial_aum, start_date, end_date,
                      rebalance_period, rebalance_ratio,
                      lookback_period, metric_type,
                      internal_rebalance_ratios,
                      regression_window=100,
                      cash_ticker='SHV US Equity',
                      macro_min_alloc=0.6,
                      max_alloc_change=0.05):
    """
    This simulation:
      - Uses a designated cash_ticker (e.g. "SHV US Equity") for the cash component.
      - Excludes the cash_ticker from momentum ranking/trading.
      - When target allocation is below 100% (risk-off), excess portfolio value is sold into the cash_ticker.
      - When switching back to risk-on, the cash position is reinvested.
      - The regime and allocation signals are now based solely on a rolling regression of the VIX futures term structure.
      - The regression is run on a rolling window (e.g. 100 days) to predict SPY returns.
      - If the predicted return is negative, the strategy goes risk-off and scales the target allocation down.
    """
    all_tickers = eq_tickers + fi_tickers + alts_tickers
    momentum_tickers = [t for t in all_tickers if t != cash_ticker]
    
    monthly_dates = get_observation_dates(prices, start_date, end_date, rebalance_period)
    daily_dates = prices.index.sort_values()
    daily_dates = daily_dates[(daily_dates >= start_date) & (daily_dates <= end_date)]
    
    portfolio = initialize_portfolio(prices, start_date, momentum_tickers, initial_aum)
    cash_qty = 0.0
    current_regime = 'risk-on'
    target_alloc = 1.0
    prev_total_aum = initial_aum

    results = []

    for current_date in daily_dates:
        daily_note = "No adjustment"
        
        # Generate signal using rolling regression on VIX term structure.
        pred_return = rolling_regression_signal(prices, macro_data, current_date, regression_window)
        if pred_return is None:
            # Not enough data; default to risk-on.
            pred_return = 0.0
        # Map predicted return to a target allocation.
        if pred_return < 0:
            current_regime = 'risk-off'
        else:
            current_regime = 'risk-on'
        target_alloc = map_predicted_return_to_alloc(pred_return, macro_min_alloc)
        
        # For reference, record the predicted return and target allocation.
        daily_note += f" | Predicted SPY return: {pred_return:.4f}, Target Alloc: {target_alloc:.2f}"
        
        mask = prices.index < current_date
        prev_date = prices.index[mask][-1]
        mom_value = compute_portfolio_value(portfolio, prices, current_date)
        
        # Adjust cash allocation if needed.
        if current_regime == 'risk-on' and cash_qty > 0:
            portfolio, cash_qty, note_update = invest_cash_into_portfolio(portfolio, prices, current_date, cash_qty, cash_ticker)
            daily_note += " | " + note_update
        elif current_regime == 'risk-off':
            portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
            daily_note += " | " + note_update
        
        # Monthly rebalancing using momentum ranking.
        if current_date in monthly_dates:
            sorted_tickers, ranks, metrics = rank_assets(prices, current_date, momentum_tickers, lookback_period, metric_type)
            temp_portfolio, trades, pre_rebalance_value = rebalance_portfolio(portfolio, prices, current_date, momentum_tickers, sorted_tickers, internal_rebalance_ratios, rebalance_ratio)
            temp_portfolio = adjust_overweight(temp_portfolio, prices, current_date, sorted_tickers, threshold=0.70)
            temp_value = compute_portfolio_value(temp_portfolio, prices, current_date)
            # Example: if the combined weight of SPY and HYG is too low when risk-off, force risk-on.
            spy_temp = temp_portfolio.get('SPY US Equity', 0) * prices.loc[current_date, 'SPY US Equity']
            hyg_temp = temp_portfolio.get('HYG US Equity', 0) * prices.loc[current_date, 'HYG US Equity']
            combined_weight = (spy_temp + hyg_temp) / temp_value if temp_value > 0 else 0

            if current_regime == 'risk-off' and combined_weight < 0.40:
                current_regime = 'risk-on'
                target_alloc = 1.0
                daily_note += " | Monthly: Forced risk-on due to SPY+HYG weight < 40%."
                total_aum = compute_portfolio_value(portfolio, prices, current_date) + cash_qty * prices.loc[current_date, cash_ticker]
                simulated_value = temp_value
                new_portfolio = {}
                for ticker in temp_portfolio:
                    price = prices.loc[current_date, ticker]
                    simulated_weight = (temp_portfolio[ticker] * price) / simulated_value if simulated_value > 0 else 1/len(temp_portfolio)
                    new_qty = (total_aum * simulated_weight) / price
                    new_portfolio[ticker] = new_qty
                portfolio = new_portfolio
                cash_qty = 0
            else:
                portfolio = temp_portfolio
                curr_value = compute_portfolio_value(portfolio, prices, current_date)
                total_aum = curr_value + cash_qty * prices.loc[current_date, cash_ticker]
                desired_value = target_alloc * total_aum
                if curr_value > desired_value:
                    portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
                    daily_note += " | Monthly: " + note_update
                elif curr_value < desired_value and cash_qty > 0:
                    portfolio, cash_qty, note_update = allocate_cash_from_portfolio(portfolio, prices, current_date, target_alloc, cash_qty, cash_ticker)
                    daily_note += " | Monthly: " + note_update

        current_mom_value = compute_portfolio_value(portfolio, prices, current_date)
        cash_price = prices.loc[current_date, cash_ticker]
        cash_value = cash_qty * cash_price
        total_aum = current_mom_value + cash_value
        ret = (total_aum - prev_total_aum) / prev_total_aum if prev_total_aum > 0 else 0
        prev_total_aum = total_aum

        row = {
            'Date': current_date,
            'Momentum AUM': current_mom_value,
            'Cash Qty': cash_qty,
            'Cash Price': cash_price,
            'Cash Value': cash_value,
            'Total AUM': total_aum,
            'Current Regime': current_regime,
            'Target Alloc': target_alloc,
            'Predicted SPY Return': pred_return,
            'Adjustment Note': daily_note,
            'Return': ret,
            'Event': 'Monthly Rebalance' if current_date in monthly_dates else 'Daily Check'
        }

        for ticker in momentum_tickers:
            price = prices.loc[current_date, ticker]
            qty = portfolio[ticker]
            notional = qty * price
            row[f'qty_{ticker}'] = qty
            row[f'notional_{ticker}'] = notional
            row[f'weight_{ticker}'] = (notional / current_mom_value) if current_mom_value > 0 else np.nan
            if current_date in monthly_dates:
                row[f'rank_{ticker}'] = ranks.get(ticker, np.nan)
                row[f'metric_{ticker}'] = metrics.get(ticker, np.nan)
                row[f'trade_{ticker}'] = trades.get(ticker, 0)
            else:
                row[f'rank_{ticker}'] = np.nan
                row[f'metric_{ticker}'] = np.nan
                row[f'trade_{ticker}'] = 0

        results.append(row)

    result_df = pd.DataFrame(results)
    result_df.set_index('Date', inplace=True)
    return result_df

###############################################
# 11. Main – Example Usage
###############################################

if __name__ == '__main__':
    eq_tickers   = ['SPY US Equity']
    fi_tickers   = ['TLT US Equity', 'HYG US Equity']
    alts_tickers = ['GLD US Equity', 'IGSB US Equity']
   
    initial_aum = 100e6  
    start_date  = pd.to_datetime('2008-01-01')
    end_date    = pd.to_datetime('2025-02-01')
    rebalance_period = 1   
    rebalance_ratio  = 0.2    
    lookback_period  = 6     
    metric_type      = 'simple'
    
    internal_rebalance_ratios = [0.8, 0.2, 0, -0.2, -0.8]
    
    price_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Codes\Historic Prices.xlsx"
    macro_filepath = r"\\asiapac.nom\data\MUM\IWM\India_IWM_IPAS\Reet\Momentum Strategy\Momentum Strategy Overlay Data.xlsx"

    prices = load_price_data(price_filepath)
    macro_data = load_macro_data(macro_filepath)
    
    result_df = simulate_strategy(prices, macro_data,
                                  eq_tickers, fi_tickers, alts_tickers,
                                  initial_aum, start_date, end_date,
                                  rebalance_period, rebalance_ratio,
                                  lookback_period, metric_type,
                                  internal_rebalance_ratios,
                                  regression_window=100,
                                  cash_ticker='SHV US Equity',
                                  macro_min_alloc=0.6,
                                  max_alloc_change=0.05)
    
    pd.set_option('display.float_format', lambda x: f'{x:,.2f}')
    # For example, to display the last few rows:
    # print(result_df[['Total AUM', 'Momentum AUM', 'Cash Price']].tail())
Editor is loading...
Leave a Comment