mail@pastecode.io avatar
7 months ago
3.5 kB
import pandas as pd
from indicators import rsi

import numpy as np
import talib

from clients import OkxClient
from models import OKXTimeFrame, OKXCurrencyPair

def calculate_rsi(data, period=14):
    """Calculate the Relative Strength Index (RSI)"""
    return talib.RSI(data, timeperiod=period)

def find_pivots(data, lbL, lbR, pivot_type='low'):
    """Find pivot points in the data"""
    pivots = np.full(len(data), False)
    for i in range(lbL, len(data) - lbR):
        window = data[i - lbL:i + lbR + 1]
        if pivot_type == 'low':
            if data[i] == min(window):
                pivots[i] = True
            if data[i] == max(window):
                pivots[i] = True
    return pivots

def bars_since(condition):
    """Calculate bars since the last true condition"""
    matches = np.where(condition)[0]
    return np.full_like(condition, np.nan).astype('float').cumsum() - matches

def in_range(index, condition, rangeLower, rangeUpper):
    """Check if index is within range since the last condition"""
    bars = bars_since(condition)
    return rangeLower <= bars[index] <= rangeUpper

def last_value_when(condition, series):
    """Return the last value of 'series' when 'condition' was True"""
    filtered_series = series[condition]
    return filtered_series.iloc[-1] if not filtered_series.empty else None

def find_divergences(price, rsi, lbL, lbR, rangeLower, rangeUpper):
    """Find RSI divergences"""
    regular_bullish = []
    regular_bearish = []
    hidden_bullish = []
    hidden_bearish = []

    price_pivot_lows = find_pivots(price, lbL, lbR)
    price_pivot_highs = find_pivots(price, lbL, lbR, pivot_type='high')

    for i in range(lbL + lbR, len(price)):
        # Regular Bullish Divergence
        if price_pivot_lows[i] and in_range(i, price_pivot_lows, rangeLower, rangeUpper):
            last_pivot_low_rsi = last_value_when(price_pivot_lows[:i], rsi)
            if last_pivot_low_rsi is not None and rsi[i] > last_pivot_low_rsi:

        # Regular Bearish Divergence
        if price_pivot_highs[i] and in_range(i, price_pivot_highs, rangeLower, rangeUpper):
            last_pivot_high_rsi = last_value_when(price_pivot_highs[:i], rsi)
            if last_pivot_high_rsi is not None and rsi[i] < last_pivot_high_rsi:

        # Hidden Bullish Divergence
        # ... (similar logic for hidden bullish and bearish divergences)

    return regular_bullish, regular_bearish, hidden_bullish, hidden_bearish

if __name__ == "__main__":

    # Example data loading (Replace this with your actual data source)
    # data = pd.read_csv('path_to_your_data.csv')
    # close_prices = data['close']

    # For demonstration, generating random data
    df = OkxClient().get_candles(
    rsi_indicator = rsi(data=df)
    df["RSI"] = rsi_indicator
    # Settings (These values are placeholders, adjust as necessary)
    len_rsi = 14
    lbR = 5
    lbL = 5
    rangeLower = 5
    rangeUpper = 60

    # Calculate RSI
    # rsi = calculate_rsi(df, len_rsi)

    # Find RSI Divergences
    reg_bull, reg_bear, hid_bull, hid_bear = find_divergences(df["close"], df["RSI"], lbL, lbR, rangeLower, rangeUpper)

    print("Regular Bullish Divergences at:", reg_bull)
    print("Regular Bearish Divergences at:",reg_bear)
Leave a Comment