Untitled
user_1226576
plain_text
3 years ago
12 kB
12
Indexable
# region imports
from AlgorithmImports import *
# endregion
class AlertBrownCaribou(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2022, 1, 1) # Set Start Date
self.SetEndDate(2022, 5, 1) # Set End date
self.SetCash(100000) # Set Strategy Cash
#self.symbol = self.AddForex('EURUSD', Resolution.Minute, Market.Oanda).Symbol
self.symbol = self.AddCrypto("BTCUSD", Resolution.Minute, Market.Bitfinex).Symbol
#init vars
self.init_vars()
def init_vars(self):
self.market_opening_hour = 2
self.market_opening_minute = 59
self.market_close_hour = 23
self.market_close_minute = 0
self.market_is_open = False
self.premarket_opening_hour = 2
self.premarket_opening_minute = 0
self.premarket_time = False
self.quatity = 0
self.max_point = 0
self.min_point = 10000000000
self.max_break = False
self.min_break = False
self.last_swing_low = None
self.last_swing_high = None
self.short_FVG_potential_area = []
self.short_FVG_high = None
self.short_FVG_low = None
self.long_FVG_potential_area = []
self.long_FVG_high = None
self.long_FVG_low = None
self.short_entry_ticket = None
self.short_stop_loss_ticket = None
self.short_take_profit_ticket = None
self.inside_trade = False
self.long_entry_ticket = None
self.long_stop_loss_ticket = None
self.long_take_profit_ticket = None
def find_swing_low(self, low_series):
swings = []
for index in range(1 , len(low_series) - 1):
if low_series[index - 1] > low_series[index] and low_series[index + 1] > low_series[index]:
swings.append(low_series[index])
if len(swings) == 0:
return None
return swings[-1]
def find_swing_high(self, high_series):
swings = []
for index in range(1 , len(high_series) - 1):
if high_series[index - 1] < high_series[index] and high_series[index + 1] < high_series[index]:
swings.append(high_series[index])
if len(swings) == 0:
return None
return swings[-1]
def short_find_FVG(self, array):
if len(array) < 2:
return None, None
for i in range(0, len(array) - 2):
if array[i].Low > array[i + 2].High:
return array[i].Low, array[i + 2].High
return None, None
def long_find_FVG(self, array):
if len(array) < 2:
return None, None
for i in range(0, len(array) - 2):
if array[i].High < array[i + 2].Low:
return array[i].High, array[i + 2].Low
return None, None
def deal_with_short_setup(self, data):
if self.market_is_open:
if self.max_point < data[self.symbol].High:
self.short_FVG_potential_area = []
self.max_break = True
history_close_price = self.History(self.symbol, 7, Resolution.Minute).low
#update max and last_swing_low
self.max_point = data[self.symbol].High
self.last_swing_low = self.find_swing_low(history_close_price)
# self.Log(f'max market:{self.max_point} ')
# self.Log(f'swing low before:{self.last_swing_low} ')
if self.max_break and self.last_swing_low is not None and not self.inside_trade:
self.short_FVG_potential_area.append(data[self.symbol])
if self.last_swing_low > data[self.symbol].Low:
self.short_FVG_high , self.short_FVG_low = self.short_find_FVG(self.short_FVG_potential_area)
if self.short_FVG_high is not None and self.short_FVG_low is not None:
entry_point = round(self.short_FVG_low, 5)
#stop_loss = round(self.max_point, 5)
stop_loss = round(self.max_point*1.0503, 5)
quantity = abs((self.Portfolio.Cash ) / (stop_loss - entry_point))/1000
quantity = 1
# self.Log(f'quantity is: {quantity}')
take_profit = round(entry_point - 4*(stop_loss-entry_point),5)
#take_profit = round(3 * self.short_FVG_low - 1.5 * self.max_point,5)/2
# self.Log(f'max {self.max_point}')
# self.Log(f'fvg low : {self.short_FVG_low}')
# self.Log(f'fvg high : {self.short_FVG_high}')
# self.Log(f'stop_loss : {stop_loss}')
# self.Log(f'take_profit : {take_profit}')
# self.Log(f'entry_point : {entry_point}')
self.short_entry_ticket = self.LimitOrder(self.symbol, -quantity, entry_point, 'Short position')
self.short_stop_loss_ticket = self.StopMarketOrder(self.symbol, quantity, stop_loss, 'Short stoploss')
#self.short_take_profit_ticket = self.LimitOrder(self.symbol, quantity, take_profit, 'take profit')
self.short_take_profit_ticket = self.LimitOrder(self.symbol, quantity, take_profit, 'Short take profit')
self.inside_trade = True
self.max_break = False
self.short_FVG_high = None
self.short_FVG_low = None
self.last_swing_low = None
def deal_with_long_setup(self, data):
if self.market_is_open:
if self.min_point > data[self.symbol].Low:
self.Log('here')
self.long_FVG_potential_area = []
self.min_break = True
history_close_price = self.History(self.symbol, 7, Resolution.Minute).high
#update max and last_swing_low
self.min_point = data[self.symbol].Low
self.last_swing_high = self.find_swing_high(history_close_price)
# self.Log(f'max market:{self.max_point} ')
# self.Log(f'swing low before:{self.last_swing_low} ')
if self.min_break and self.last_swing_high is not None and not self.inside_trade:
self.Log('here2')
self.long_FVG_potential_area.append(data[self.symbol])
if self.last_swing_high < data[self.symbol].High:
self.long_FVG_low , self.long_FVG_high = self.long_find_FVG(self.long_FVG_potential_area)
if self.long_FVG_low is not None and self.long_FVG_high is not None:
self.Log('here3')
entry_point = round(self.long_FVG_high, 5)
stop_loss = round(self.min_point*0.95, 5)
quantity = abs((self.Portfolio.Cash ) / (entry_point - stop_loss ))/1000
take_profit = round(entry_point + 4*(entry_point - stop_loss),5)
# self.Log(f'quantity is: {quantity}')
#take_profit = round(3 * self.short_FVG_low - 1.5 * self.max_point,5)/2
# self.Log(f'max {self.max_point}')
# self.Log(f'fvg low : {self.short_FVG_low}')
# self.Log(f'fvg high : {self.short_FVG_high}')
# self.Log(f'stop_loss : {stop_loss}')
# self.Log(f'take_profit : {take_profit}')
# self.Log(f'entry_point : {entry_point}')
self.long_entry_ticket = self.LimitOrder(self.symbol, quantity, entry_point, 'Long position')
self.long_stop_loss_ticket = self.StopMarketOrder(self.symbol, -quantity, stop_loss, 'Long stoploss')
#self.short_take_profit_ticket = self.LimitOrder(self.symbol, quantity, take_profit, 'take profit')
self.long_take_profit_ticket = self.LimitOrder(self.symbol, -quantity, take_profit, 'Long take profit')
self.inside_trade = True
self.min_break = False
self.long_FVG_high = None
self.long_FVG_low = None
self.last_swing_high = None
def OnData(self, data: Slice):
if self.Time.hour == self.premarket_opening_hour and self.Time.minute == self.premarket_opening_minute:
self.premarket_time = True
self.market_is_open = False
if self.Time.hour == self.market_opening_hour and self.Time.minute == self.market_opening_minute:
self.market_is_open = True
self.premarket_time = False
if (self.Time.hour == self.market_close_hour and self.Time.minute > self.market_close_minute) or (self.Time.hour > self.market_close_hour):
self.init_vars()
self.Transactions.CancelOpenOrders(self.symbol)
self.Liquidate()
self.inside_trade = False
if self.premarket_time:
if self.max_point < data[self.symbol].High: #update max point
self.max_point = data[self.symbol].High
if self.premarket_time:
if self.min_point > data[self.symbol].Low: #update min point
self.min_point = data[self.symbol].Low
self.deal_with_long_setup(data)
self.deal_with_short_setup(data)
def OnOrderEvent(self, orderEvent):
# Check if the order is filled
if self.short_entry_ticket is not None and self.short_stop_loss_ticket is not None and self.short_take_profit_ticket is not None:
if (orderEvent.OrderId == self.short_stop_loss_ticket.OrderId or orderEvent.OrderId == self.short_take_profit_ticket.OrderId):
if orderEvent.Status == OrderStatus.Filled:
# self.Log(f'our cash here {self.Portfolio.Cash}' )
self.Transactions.CancelOpenOrders(self.symbol)
self.Liquidate()
self.inside_trade = False
temp_max = self.max_point
self.init_vars()
self.max_point = temp_max
if self.long_entry_ticket is not None and self.long_stop_loss_ticket is not None and self.long_take_profit_ticket is not None:
if (orderEvent.OrderId == self.long_stop_loss_ticket.OrderId or orderEvent.OrderId == self.long_take_profit_ticket.OrderId):
if orderEvent.Status == OrderStatus.Filled:
# self.Log(f'our cash here {self.Portfolio.Cash}' )
self.Transactions.CancelOpenOrders(self.symbol)
self.Liquidate()
self.inside_trade = False
temp_min = self.min_point
self.init_vars()
self.max_point = temp_min
Editor is loading...