Untitled

 avatar
unknown
plain_text
a year ago
18 kB
7
Indexable
import pandas as pd
import numpy as np
import warnings
import itertools

# Filter out the SettingWithCopyWarning
warnings.filterwarnings('ignore')
import matplotlib.pyplot as plt
from IPython.display import display


def max_dd(returns):
    """Determines the maximum drawdown of a strategy.

    Args:
        returns : (pandas.Series) Daily returns of the strategy, noncumulative.

    Returns:
        (float): Maximum drawdown.
    """
    r = returns.add(1).cumprod()
    dd = r.div(r.cummax()).sub(1)
    mdd = dd.min()
    end = dd.idxmin()
    start = r.loc[:end].idxmax()
    return {"Max Drawdown": round(mdd * 100, 2),
            "Start": start.strftime("%m/%d/%Y"),
            "End": end.strftime("%m/%d/%Y")}


def period_volatility(x, annualization_factor=252):
    """
    Determines the volatility of a strategy.

    Args:
        x : (pandas.Series) Daily returns of the strategy, noncumulative.

        annualization_factor : int(default 252 days), factor used for annualization of values (252 if data is daily,
                                                                            52 if data is weekly,
                                                                            12 if data is monthly,
                                                                            4 of data is quarterly)

    Returns:
        (float) Volatility.
    """
    return np.std(x) * (annualization_factor ** 0.5)


def period_downside_volatility(x, annualization_factor=252):
    """
    Determines the downside volatility(using only negative returns) of a strategy.

    Args:
        x : (pandas.Series) Daily returns of the strategy, noncumulative.

        annualization_factor : int(default 252 days), factor used for annualization of values (252 if data is daily,
                                                                            52 if data is weekly,
                                                                            12 if data is monthly,
                                                                            4 of data is quarterly)

    Returns:
        (float) Downside volatility.
    """
    return (np.std(x[x < 0]) * (annualization_factor ** 0.5))


def cumulative_return(x):
    """
        Calculates the cumulative return of a strategy.

        Args:
            x : (pandas.Series) Daily returns of the strategy, noncumulative.

        Returns:
            (float) Cumulative return.
        """
    return (np.prod(1 + x) - 1)


def period_return(x,
                  annualize=True,
                  annualization_factor=252):
    """
        Determines return of a strategy.

        Args:
            x : (pandas.Series) Daily returns of the strategy, noncumulative.
            annualize: boolean, False if annulization is not used, True otherwise
            annualization_factor : int(default 252 days), factor used for annualization of values (252 if data is daily,
                                                                                52 if data is weekly,
                                                                                12 if data is monthly,
                                                                                4 of data is quarterly)

        Returns:
            (float) Return
        """


    if annualize:
        ann_factor = len(x) / annualization_factor
        return ((1 + cumulative_return(x)) ** (1 / ann_factor)) - 1
    else:
        return cumulative_return(x)


def period_sharpe_ratio(x,
                        annualize=True,
                        annualization_factor=252):
    """
    Determines Sharpe ratio of a strategy.

    Args:
        x : (pandas.Series) Daily returns of the strategy, noncumulative.
        annualize: boolean, False if annulization is not used, True otherwise
        annualization_factor : int(default 252 days), factor used for annualization of values (252 if data is daily,
                                                                            52 if data is weekly,
                                                                            12 if data is monthly,
                                                                            4 of data is quarterly)

    Returns:
        (float) Sharpe ratio
    """
    a = period_return(x, annualize=annualize, annualization_factor=annualization_factor)
    b = period_volatility(x, annualization_factor=annualization_factor)
    return a / b


def sortino_ratio(x, annualize=True, annualization_factor=252):
    """
    Determines Sortino ratio of a strategy.

    Args:
        x : (pandas.Series) Daily returns of the strategy, noncumulative.
        annualize: boolean, False if annulization is not used, True otherwise
        annualization_factor : int(default 252 days), factor used for annualization of values (252 if data is daily,
                                                                            52 if data is weekly,
                                                                            12 if data is monthly,
                                                                            4 of data is quarterly)

    Returns:
        (float) Sortino ratio
    """
    a = period_return(x, annualize=annualize, annualization_factor=annualization_factor)
    b = period_downside_volatility(x, annualization_factor=annualization_factor)
    return a / b


def calmar_ratio(x, annualize=True, annualization_factor=252):
    """
    Determines Calmar ratio of a strategy.

    Args:
        x : (pandas.Series) Daily returns of the strategy, noncumulative.
        annualize: boolean, False if annulization is not used, True otherwise
        annualization_factor : int(default 252 days), factor used for annualization of values (252 if data is daily,
                                                                            52 if data is weekly,
                                                                            12 if data is monthly,
                                                                            4 of data is quarterly)

    Returns:
        (float) Calmar ratio
    """
    dd = max_dd(x)
    dd = dd["Max Drawdown"]
    if dd < 0:
        a = period_return(x, annualize=annualize, annualization_factor=annualization_factor) * 100
        b = abs(dd)
        return a / b
    else:
        return np.nan


####

def iqr_indicator(x):
    q3, q1 = np.percentile(x, [75, 25])
    iqr = q3 - q1
    return iqr


def qcd_indicator(x):
    q3, q1 = np.percentile(x, [75, 25])
    qcd = (q3 - q1) / (q3 + q1)
    return qcd


def cv_indicator(x):
    return np.std(x) / np.mean(x)


def generate_simulation_params_list(param_dict):
    # Extract the keys and values from the dictionary
    param_names = list(param_dict.keys())
    param_values = list(param_dict.values())

    # Generate all possible combinations of the parameter values
    param_combinations = list(itertools.product(*param_values))

    # Convert each combination into a dictionary with the corresponding parameter names
    full_list = [dict(zip(param_names, combination)) for combination in param_combinations]

    return full_list


def calculate_metrics(x):
    return pd.Series({
        'Sharpe': period_sharpe_ratio(x),
        'CAGR': cumulative_return(x) * 100,
        'Calmar': calmar_ratio(x),
        'Sortino': sortino_ratio(x),
        'MaxDD': max_dd(x)['Max Drawdown'],
        'Vol': period_volatility(x) * 100
    })


def simulation_results(backtest_results, list_params, eop_freq='Y', weekdays=False):
    sim_key = '_'.join([str(x) for x in list_params.values()])

    backtest_results['positions'] = backtest_results['quantity'].diff()
    backtest_results['positions'].fillna(0, inplace=True)

    backtest_results['signal_diff'] = backtest_results['signals'].ffill().fillna(0).astype(int).diff()

    n_trades = pd.DataFrame(
        backtest_results.groupby(pd.Grouper(freq='Y')).apply(lambda x: len(x[(x['signal_diff'] != 0)])))
    n_trades.columns = ['trades']
    n_trades['year'] = n_trades.index.year.astype(str)
    n_trades['metric'] = f"EOY_Trades"
    n_trades = n_trades.set_index(['metric', 'year'])[['trades']].T
    n_trades.index = [sim_key]
    n_trades[('ITD', 'Trades')] = n_trades.sum(axis=1)

    backtest_results['buysell'] = np.where(backtest_results['positions'] > 0, 'BUY', 'SELL')
    backtest_results['buysell'] = np.where(backtest_results['positions'] == 0, np.nan, backtest_results['buysell'])
    backtest_results['return'] = backtest_results['total_nav'].pct_change()

    log_returns = np.log(1 + backtest_results[['return']].copy())
    log_returns_daily = log_returns.resample('D').sum()
    log_returns_daily['return'] = np.exp(log_returns_daily['return']) - 1

    if weekdays == True:
        returns_daily = log_returns_daily.copy()
    else:
        returns_daily = log_returns_daily[log_returns_daily.index.dayofweek < 5]

    monthly_returns = pd.DataFrame(
        returns_daily.groupby(pd.Grouper(freq='1W', label='right')).apply(lambda x: period_return(x['return'],
                                                                                                  annualize=False) * 100).rename(
            'return'))
    monthly_returns.index = pd.to_datetime(monthly_returns.index)
    monthly_returns['month_year'] = monthly_returns.index.month_name().str[
                                    :3] + '_' + monthly_returns.index.year.astype(str)
    monthly_returns['metric'] = 'Monthly_RET'
    monthly_returns = monthly_returns.set_index(['metric', 'month_year'])[['return']].T
    monthly_returns.index = [sim_key]

    # Apply the function to each group for period defined in config
    metrics_table_eop = returns_daily.groupby(pd.Grouper(freq=eop_freq))['return'].apply(calculate_metrics).reset_index()
    metrics_table_eop.columns = ['date', 'metric', 'measure']
    metrics_table_eop['year'] = metrics_table_eop['date'].dt.year.astype(str) + '_' + metrics_table_eop['date'].dt.month_name().str[
                                    :3]

    metrics_table_eop['metric'] = metrics_table_eop['metric'].apply(lambda x: f"EOP_{x}")
    metrics_table_eop = metrics_table_eop.set_index(['metric', 'year'])[['measure']].T
    metrics_table_eop.index = [sim_key]

    # Apply the function to each group
    metrics_table_itd = pd.DataFrame(calculate_metrics(returns_daily['return']), columns=[sim_key]).T
    metrics_table_itd.columns = pd.MultiIndex.from_product([['ITD'], metrics_table_itd.columns])
    final_df = pd.concat([metrics_table_itd, metrics_table_eop, monthly_returns, n_trades], axis=1)

    return final_df, returns_daily['return'].rename(sim_key)


def assign_rankings(simulation_df_results, include_itd =True):
    ranking_rules = {'Sharpe': False, 'CAGR': False, 'Calmar': False, 'Sortino': False,
                     'MaxDD': True, 'Vol': True, 'Trades': False, 'HitRatio':False}

    ranking_simulations = []

    data = simulation_df_results.loc[:, ('Monthly_RET', slice(None))]

    def pct_positive(row):
        positive_values = row[row > 0].count()
        total_values = row.count()
        return (positive_values / total_values) * 100

    # Applying the function to each row
    data['hit_ratio_monret'] = data.apply(pct_positive, axis=1)
    filtered_byhitratio = data[data['hit_ratio_monret'] >= 50]
    filtered_byhitratio['monret_mean'] = filtered_byhitratio.loc[:, ('Monthly_RET', slice(None))].mean(axis=1)
    filtered_by_mean = filtered_byhitratio[filtered_byhitratio['monret_mean'] > 0]
    filtered_by_mean['IQR'] = filtered_by_mean.apply(iqr_indicator, axis=1).rank(ascending=True)
    filtered_by_mean['QCD'] = filtered_by_mean.apply(qcd_indicator, axis=1).rank(ascending=True)
    filtered_by_mean['CV'] = filtered_by_mean.apply(cv_indicator, axis=1).rank(ascending=True)
    ranking_simulations.append(filtered_by_mean['IQR'])
    ranking_simulations.append(filtered_by_mean['QCD'])
    ranking_simulations.append(filtered_by_mean['CV'])

    if include_itd:
        for c in simulation_df_results.loc[filtered_by_mean.index, ('ITD', slice(None))].columns:
            df_temp = simulation_df_results.loc[filtered_by_mean.index, c]
            df_temp = df_temp.rank(ascending=ranking_rules[c[-1]])
    
            print(df_temp.columns)
            
            ranking_simulations.append(df_temp.rename(columns = {c:f"ITD_{c[-1]}"}))

    cols_eoy = ['EOP_Sharpe', 'EOP_Vol', 'EOP_CAGR', 'EOP_Calmar', 'EOP_Sortino']
    for c in cols_eoy:
        name = c.split('_')[-1]
        df_temp = simulation_df_results.loc[filtered_by_mean.index, (c, slice(None))]
        df_temp[f'IQR_{name}'] = df_temp.apply(iqr_indicator, axis=1) * (1 if ranking_rules[name] else -1)
        df_temp[f'Mean_{name}'] = df_temp.mean(axis=1)
        df_temp[f"Rank_{name}"] = df_temp[[f'IQR_{name}', f'Mean_{name}']].apply(tuple, axis=1) \
            .rank(method='dense', ascending=ranking_rules[name]).astype(int)
        
        ranking_simulations.append(df_temp[f"Rank_{name}"])
    
    
    
    df_temp = pd.DataFrame(filtered_by_mean.loc[:, 'hit_ratio_monret'])
    df_temp['Rank_Hitratio']= df_temp['hit_ratio_monret'].rank(method='dense', ascending=ranking_rules['HitRatio']).astype(int)
    ranking_simulations.append(df_temp["Rank_Hitratio"])
    
    final_ranks=pd.concat(ranking_simulations, axis=1)
    print(final_ranks)
    #cols_to_rank =['IQR', 'QCD', 'CV', 'ITD_Sharpe', 'ITD_CAGR', 'ITD_Calmar', 'ITD_Sortino', 'ITD_MaxDD', 'ITD_Vol', 'ITD_Trades', 'Rank_Hitratio']
    cols_to_rank = [
        ('ITD', 'Sharpe'), ('ITD', 'CAGR'), ('ITD', 'Calmar'), ('ITD', 'Sortino'),
        ('ITD', 'MaxDD'), ('ITD', 'Vol'), ('ITD', 'Trades'), 'IQR', 'QCD', 'CV', 'Rank_Hitratio'
    ]
    final_ranks['Final_Rank'] = final_ranks[cols_to_rank].sum(axis=1)

    return final_ranks

def selected_by_corr(allrets, top_N, negative_only=False):
    
    if negative_only:
        # Filter for negative returns only
        allrets = allrets.applymap(lambda x: x if x < 0 else 0)
    
    cor_matrix = allrets.corr()
    mean_cor = cor_matrix.mean()
    selected_models = mean_cor.nsmallest(top_N).index
    return selected_models

def selected_by_cov(allrets, top_N, negative_only=False):
    if negative_only:
        # Filter for negative returns only
        allrets = allrets.applymap(lambda x: x if x < 0 else 0)
    
    cov_matrix = allrets.cov()
    mean_cov = cov_matrix.mean()
    selected_models = mean_cov.nsmallest(top_N).index
    return selected_models

def equal_weight_ensemble(df):
    num_models= len(df.columns)
    weights = 1 / num_models
    equally_weighted_returns = df.apply(lambda x: x * weights).sum(axis=1)
    return equally_weighted_returns

def decorralated_portfolio(returns_all,rankings,top_rank=10,bottom_corr=3):
    decorralated_models = selected_by_corr(returns_all[rankings.sort_values('Final_Rank').head(top_rank).index], bottom_corr)
    return decorralated_models


def performance(
        returns,
        annualize=True,
        annualization_factor=252,
        color="brown",
        label="LONG",
        verbose=True,
        plot=True):
    """Calculates basic performance measures on series of returns(Return, Volatility, Sharpe).

    Args:
        returns: (pandas Series), Daily(period) noncumulative returns of the strategy
        annualize: boolean,False if annulization is not used, True otherwise
        annualization_factor : int(default 252 days), factor used for annualization of values (252 if data is daily,
                                                                            52 if data is weekly,
                                                                            12 if data is monthly,
                                                                            4 of data is quarterly)
        color: the color used for the plot and printing if verbose=True
        label: the name for the strategy in returns
        verbose: (boolean) True if you want to print the output
        plot: (boolean) True if you want to plot cumulative return of the strategy (returns)

    Returns:
        pandas DataFrame, plot or print of the calculated measures
    """

    Volatility = np.round(period_volatility(returns, annualization_factor=annualization_factor) * 100, 2)
    Return = np.round(period_return(returns, annualize=annualize, annualization_factor=annualization_factor) * 100, 2)
    Sharpe = np.round(period_sharpe_ratio(returns, annualize=annualize, annualization_factor=annualization_factor), 2)
    Sortino = np.round(sortino_ratio(returns, annualize=annualize, annualization_factor=annualization_factor), 2)
    Calmar = np.round(calmar_ratio(returns, annualize=annualize, annualization_factor=annualization_factor), 2)

    max_drawdown = max_dd(returns)
    Max_DD = max_drawdown["Max Drawdown"]
    start = max_drawdown["Start"]
    end = max_drawdown["End"]

    stat = {
        "Return": [Return],
        "Annualized Volatility": [Volatility],
        "Sharpe": [Sharpe],
        "Sortino ratio": [Sortino],
        "Calmar ratio": [Calmar],
        "Max Draw-Down": [Max_DD],
        "MDD Start": [start],
        "MDD End": [end]
    }

    df_output = pd.DataFrame(stat)
    df_to_print = df_output.rename(index={0: label})

    if verbose:
        pd.options.display.notebook_repr_html = True


        def color_b(s):
            return np.where(s == label, f"background-color: {color};", "")

        display(df_to_print.style.apply_index(color_b))

    if plot:
        start = returns.index.min()
        ptf_cum = pd.Series([1], index=[start])
        ptf_cum = pd.concat([ptf_cum, np.cumprod(returns[returns.index > start] + 1)])
        ptf_cum.plot(figsize=(12, 5), color=color, label=label)
        if label.startswith('_'):
            pass
        else:
            plt.legend()
    return df_to_print[["Return", "Annualized Volatility", "Sharpe",
                      "Sortino ratio", "Calmar ratio",
                      "Max Draw-Down"]]
Editor is loading...
Leave a Comment