Untitled
unknown
plain_text
10 months ago
7.4 kB
8
Indexable
async def get_bulk_trades_for_expiration(self, underlying: str, start_date: str, end_date: str, exp: datetime, underlying_prices): data = None try: data = await client.get_bulk_trade_quote(underlying, start_date, end_date, exp) except Exception as e: print(e) oi_map = {} try: oi = await client.get_bulk_oi(underlying, start_date, end_date, exp) for obj in oi['response']: if isinstance(obj, int) == False and 'ticks' in obj and len(obj['ticks']) > 0 and isinstance(obj['ticks'], list) == True: contract = obj['contract'] symbol = str(contract['expiration'])[0:4] + '-' + str(contract['expiration'])[4:6] + '-' + str(contract['expiration'])[6:8] + ' ' + str(contract['strike']) + contract['right'] date = obj['ticks'][0][2] if len(obj['ticks'][0]) > 1 else 0 oi_map[symbol][date] = obj['ticks'][0][1] if len(obj['ticks'][0]) > 1 else 0 except Exception as e: print(e) contracts = [] schema = [ ('ms_of_day', pl.Float64), ('sequence', pl.Float64), ('ext_condition1', pl.Float64), ('ext_condition2', pl.Float64), ('ext_condition3', pl.Float64), ('ext_condition4', pl.Float64), ('condition', pl.Float64), ('size', pl.Float64), ('exchange', pl.Float64), ('price', pl.Float64), ('condition_flags', pl.Float64), ('price_flags', pl.Float64), ('volume_type', pl.Float64), ('records_back', pl.Float64), ('ms_of_day2', pl.Float64), ('bid_size', pl.Float64), ('bid_exchange', pl.Float64), ('bid', pl.Float64), ('bid_condition', pl.Float64), ('ask_size', pl.Float64), ('ask_exchange', pl.Float64), ('ask', pl.Float64), ('ask_condition', pl.Float64), ('date', pl.Utf8), ] if not data: return pl.DataFrame([], schema=schema) for obj in data['response']: if isinstance(obj, int) == False and 'ticks' in obj and len(obj['ticks']) > 0 and isinstance(obj['ticks'], list) == True: pd_df = pd.DataFrame(obj['ticks']) f = pl.DataFrame( pd_df, schema=schema ) contract = obj['contract'] symbol = str(contract['expiration'])[0:4] + '-' + str(contract['expiration'])[4:6] + '-' + str(contract['expiration'])[6:8] + ' ' + str(contract['strike']) + contract['right'] f = f.with_columns( pl.lit(symbol).alias('symbol'), ms_of_day=pl.col('ms_of_day').cast(pl.Float64), ms_of_day2=pl.col('ms_of_day2').cast(pl.Float64), sequence=pl.col('sequence').cast(pl.Float64), ext_condition1=pl.col('ext_condition1').cast(pl.Float64), ext_condition2=pl.col('ext_condition2').cast(pl.Float64), ext_condition3=pl.col('ext_condition3').cast(pl.Float64), ext_condition4=pl.col('ext_condition4').cast(pl.Float64), condition=pl.col('condition').cast(pl.Float64), size=pl.col('size').cast(pl.Float64), exchange=pl.col('exchange').cast(pl.Float64), condition_flags=pl.col('condition_flags').cast(pl.Float64), price_flags=pl.col('price_flags').cast(pl.Float64), volume_type=pl.col('volume_type').cast(pl.Float64), records_back=pl.col('records_back').cast(pl.Float64), bid_condition=pl.col('bid_condition').cast(pl.Float64), bid_size=pl.col('bid_size').cast(pl.Float64), bid_exchange=pl.col('bid_exchange').cast(pl.Float64), ask_exchange=pl.col('ask_exchange').cast(pl.Float64), ask_size=pl.col('ask_size').cast(pl.Float64), ask_condition=pl.col('ask_condition').cast(pl.Float64), volume_on_day=pl.col('size').cumsum().cast(pl.Float64), mid=(pl.col('bid') + pl.col('ask')) / 2.0, date=pl.col('date').cast(pl.Utf8) ) f = f.with_columns( flag=pl.col('symbol').apply(lambda x: 1.0 if x[-1] == 'C' else -1.0), timestamp=pl.col('date').str.to_datetime("%Y%m%d").dt.timestamp().cast(pl.Float64)/1000 + pl.col('ms_of_day').cast(pl.Float64), side=pl.struct(['price', 'bid', 'mid', 'ask', 'condition']).apply(lambda x: self.side_of_market(x['price'], x['bid'], x['mid'], x['ask']) * (-1.0 if x['condition'] == 40.0 else 1.0)), oi=pl.col('date').apply(lambda x: oi_map[symbol][x] if (symbol in oi_map and x in oi_map[symbol]) else 0.0) ) f = f.with_columns( flow=(pl.col('size')*pl.col('side')*pl.col('flag')).fill_null(0), timestamp=pl.from_epoch(pl.col('timestamp')/1000, time_unit="s"), ) f = f.with_columns( cumulative_flow=pl.col('flow').cumsum() ) # if symbol == '2024-06-21 20000C': # print(oi_map['2024-06-21 20000C']) # print(f) contracts.append(f) if len(contracts) <= 0: return pl.DataFrame([], schema=schema) print(oi_map) df = pl.concat(contracts, how="vertical") df = df.with_columns( edge=100.0 * pl.col('size') * (pl.col('price') - pl.col('mid')) * pl.col('flag'), premium=100.0 * pl.col('size') * pl.col('price') * pl.col('side') * pl.col('flag') ) df = df.with_columns( sec=pl.col('ms_of_day') / 1000.0, call_premium=pl.col('premium') * pl.when(pl.col("flag") == 1.0).then(1.0).otherwise(0.0), put_premium=pl.col('premium') * pl.when(pl.col("flag") == -1.0).then(1.0).otherwise(0.0) ) return df async def get_all_trades(self, underlying, start_date, end_date): expiration_dates = await self.get_expirations(underlying) expiration_dates = list(filter(lambda x: str(x) >= end_date, expiration_dates)) trades = [self.get_bulk_trades_for_expiration(underlying, start_date, end_date, exp, pl.DataFrame([])) for exp in expiration_dates] trades = await asyncio.gather(*trades) trades = filter(lambda x: len(x) > 0, trades) df = pl.concat(trades, how="vertical") df = df.sort(['date', 'ms_of_day'], descending=[False, False]) df = df.with_columns( expiration=pl.col('symbol').apply(lambda x: x.split(' ')[0]), strike=pl.col('symbol').apply(lambda x: x.split(' ')[1]) ) return df
Editor is loading...
Leave a Comment