Untitled
unknown
plain_text
a year ago
7.4 kB
11
Indexable
async def get_bulk_trades_for_expiration(self, underlying: str, start_date: str, end_date: str, exp: datetime, underlying_prices):
data = None
try:
data = await client.get_bulk_trade_quote(underlying, start_date, end_date, exp)
except Exception as e:
print(e)
oi_map = {}
try:
oi = await client.get_bulk_oi(underlying, start_date, end_date, exp)
for obj in oi['response']:
if isinstance(obj, int) == False and 'ticks' in obj and len(obj['ticks']) > 0 and isinstance(obj['ticks'], list) == True:
contract = obj['contract']
symbol = str(contract['expiration'])[0:4] + '-' + str(contract['expiration'])[4:6] + '-' + str(contract['expiration'])[6:8] + ' ' + str(contract['strike']) + contract['right']
date = obj['ticks'][0][2] if len(obj['ticks'][0]) > 1 else 0
oi_map[symbol][date] = obj['ticks'][0][1] if len(obj['ticks'][0]) > 1 else 0
except Exception as e:
print(e)
contracts = []
schema = [
('ms_of_day', pl.Float64),
('sequence', pl.Float64),
('ext_condition1', pl.Float64),
('ext_condition2', pl.Float64),
('ext_condition3', pl.Float64),
('ext_condition4', pl.Float64),
('condition', pl.Float64),
('size', pl.Float64),
('exchange', pl.Float64),
('price', pl.Float64),
('condition_flags', pl.Float64),
('price_flags', pl.Float64),
('volume_type', pl.Float64),
('records_back', pl.Float64),
('ms_of_day2', pl.Float64),
('bid_size', pl.Float64),
('bid_exchange', pl.Float64),
('bid', pl.Float64),
('bid_condition', pl.Float64),
('ask_size', pl.Float64),
('ask_exchange', pl.Float64),
('ask', pl.Float64),
('ask_condition', pl.Float64),
('date', pl.Utf8),
]
if not data:
return pl.DataFrame([], schema=schema)
for obj in data['response']:
if isinstance(obj, int) == False and 'ticks' in obj and len(obj['ticks']) > 0 and isinstance(obj['ticks'], list) == True:
pd_df = pd.DataFrame(obj['ticks'])
f = pl.DataFrame(
pd_df,
schema=schema
)
contract = obj['contract']
symbol = str(contract['expiration'])[0:4] + '-' + str(contract['expiration'])[4:6] + '-' + str(contract['expiration'])[6:8] + ' ' + str(contract['strike']) + contract['right']
f = f.with_columns(
pl.lit(symbol).alias('symbol'),
ms_of_day=pl.col('ms_of_day').cast(pl.Float64),
ms_of_day2=pl.col('ms_of_day2').cast(pl.Float64),
sequence=pl.col('sequence').cast(pl.Float64),
ext_condition1=pl.col('ext_condition1').cast(pl.Float64),
ext_condition2=pl.col('ext_condition2').cast(pl.Float64),
ext_condition3=pl.col('ext_condition3').cast(pl.Float64),
ext_condition4=pl.col('ext_condition4').cast(pl.Float64),
condition=pl.col('condition').cast(pl.Float64),
size=pl.col('size').cast(pl.Float64),
exchange=pl.col('exchange').cast(pl.Float64),
condition_flags=pl.col('condition_flags').cast(pl.Float64),
price_flags=pl.col('price_flags').cast(pl.Float64),
volume_type=pl.col('volume_type').cast(pl.Float64),
records_back=pl.col('records_back').cast(pl.Float64),
bid_condition=pl.col('bid_condition').cast(pl.Float64),
bid_size=pl.col('bid_size').cast(pl.Float64),
bid_exchange=pl.col('bid_exchange').cast(pl.Float64),
ask_exchange=pl.col('ask_exchange').cast(pl.Float64),
ask_size=pl.col('ask_size').cast(pl.Float64),
ask_condition=pl.col('ask_condition').cast(pl.Float64),
volume_on_day=pl.col('size').cumsum().cast(pl.Float64),
mid=(pl.col('bid') + pl.col('ask')) / 2.0,
date=pl.col('date').cast(pl.Utf8)
)
f = f.with_columns(
flag=pl.col('symbol').apply(lambda x: 1.0 if x[-1] == 'C' else -1.0),
timestamp=pl.col('date').str.to_datetime("%Y%m%d").dt.timestamp().cast(pl.Float64)/1000 + pl.col('ms_of_day').cast(pl.Float64),
side=pl.struct(['price', 'bid', 'mid', 'ask', 'condition']).apply(lambda x: self.side_of_market(x['price'], x['bid'], x['mid'], x['ask']) * (-1.0 if x['condition'] == 40.0 else 1.0)),
oi=pl.col('date').apply(lambda x: oi_map[symbol][x] if (symbol in oi_map and x in oi_map[symbol]) else 0.0)
)
f = f.with_columns(
flow=(pl.col('size')*pl.col('side')*pl.col('flag')).fill_null(0),
timestamp=pl.from_epoch(pl.col('timestamp')/1000, time_unit="s"),
)
f = f.with_columns(
cumulative_flow=pl.col('flow').cumsum()
)
# if symbol == '2024-06-21 20000C':
# print(oi_map['2024-06-21 20000C'])
# print(f)
contracts.append(f)
if len(contracts) <= 0:
return pl.DataFrame([], schema=schema)
print(oi_map)
df = pl.concat(contracts, how="vertical")
df = df.with_columns(
edge=100.0 * pl.col('size') * (pl.col('price') - pl.col('mid')) * pl.col('flag'),
premium=100.0 * pl.col('size') * pl.col('price') * pl.col('side') * pl.col('flag')
)
df = df.with_columns(
sec=pl.col('ms_of_day') / 1000.0,
call_premium=pl.col('premium') * pl.when(pl.col("flag") == 1.0).then(1.0).otherwise(0.0),
put_premium=pl.col('premium') * pl.when(pl.col("flag") == -1.0).then(1.0).otherwise(0.0)
)
return df
async def get_all_trades(self, underlying, start_date, end_date):
expiration_dates = await self.get_expirations(underlying)
expiration_dates = list(filter(lambda x: str(x) >= end_date, expiration_dates))
trades = [self.get_bulk_trades_for_expiration(underlying, start_date, end_date, exp, pl.DataFrame([])) for exp in expiration_dates]
trades = await asyncio.gather(*trades)
trades = filter(lambda x: len(x) > 0, trades)
df = pl.concat(trades, how="vertical")
df = df.sort(['date', 'ms_of_day'], descending=[False, False])
df = df.with_columns(
expiration=pl.col('symbol').apply(lambda x: x.split(' ')[0]),
strike=pl.col('symbol').apply(lambda x: x.split(' ')[1])
)
return dfEditor is loading...
Leave a Comment