Untitled

 avatar
unknown
plain_text
10 months ago
7.4 kB
8
Indexable
async def get_bulk_trades_for_expiration(self, underlying: str, start_date: str, end_date: str, exp: datetime, underlying_prices): 
        data = None
        try:
            data = await client.get_bulk_trade_quote(underlying, start_date, end_date, exp)
        except Exception as e:
            print(e)
        oi_map = {}
        try:
            oi = await client.get_bulk_oi(underlying, start_date, end_date, exp)
            for obj in oi['response']:
                if isinstance(obj, int) == False and 'ticks' in obj and len(obj['ticks']) > 0 and isinstance(obj['ticks'], list) == True:
                    contract = obj['contract']
                    symbol = str(contract['expiration'])[0:4] + '-' + str(contract['expiration'])[4:6] + '-' + str(contract['expiration'])[6:8] + ' ' + str(contract['strike']) + contract['right']
                    date = obj['ticks'][0][2] if len(obj['ticks'][0]) > 1 else 0
                    oi_map[symbol][date] = obj['ticks'][0][1] if len(obj['ticks'][0]) > 1 else 0
        except Exception as e:
            print(e)
        contracts = []
        schema = [
                        ('ms_of_day', pl.Float64), 
                        ('sequence', pl.Float64), 
                        ('ext_condition1', pl.Float64), 
                        ('ext_condition2', pl.Float64), 
                        ('ext_condition3', pl.Float64), 
                        ('ext_condition4', pl.Float64), 
                        ('condition', pl.Float64), 
                        ('size', pl.Float64), 
                        ('exchange', pl.Float64), 
                        ('price', pl.Float64), 
                        ('condition_flags', pl.Float64), 
                        ('price_flags', pl.Float64), 
                        ('volume_type', pl.Float64), 
                        ('records_back', pl.Float64),
                        ('ms_of_day2', pl.Float64),
                        ('bid_size', pl.Float64), 
                        ('bid_exchange', pl.Float64), 
                        ('bid', pl.Float64), 
                        ('bid_condition', pl.Float64), 
                        ('ask_size', pl.Float64), 
                        ('ask_exchange', pl.Float64), 
                        ('ask', pl.Float64), 
                        ('ask_condition', pl.Float64), 
                        ('date', pl.Utf8),
        ]
        if not data:
            return pl.DataFrame([], schema=schema)
        for obj in data['response']:
            if isinstance(obj, int) == False and 'ticks' in obj and len(obj['ticks']) > 0 and isinstance(obj['ticks'], list) == True:
                pd_df = pd.DataFrame(obj['ticks'])
                f = pl.DataFrame(
                    pd_df, 
                    schema=schema
                )
                contract = obj['contract']
                symbol = str(contract['expiration'])[0:4] + '-' + str(contract['expiration'])[4:6] + '-' + str(contract['expiration'])[6:8] + ' ' + str(contract['strike']) + contract['right']
                f = f.with_columns(
                    pl.lit(symbol).alias('symbol'),
                    ms_of_day=pl.col('ms_of_day').cast(pl.Float64),
                    ms_of_day2=pl.col('ms_of_day2').cast(pl.Float64),
                    sequence=pl.col('sequence').cast(pl.Float64),
                    ext_condition1=pl.col('ext_condition1').cast(pl.Float64),
                    ext_condition2=pl.col('ext_condition2').cast(pl.Float64),
                    ext_condition3=pl.col('ext_condition3').cast(pl.Float64),
                    ext_condition4=pl.col('ext_condition4').cast(pl.Float64),
                    condition=pl.col('condition').cast(pl.Float64),
                    size=pl.col('size').cast(pl.Float64),
                    exchange=pl.col('exchange').cast(pl.Float64),
                    condition_flags=pl.col('condition_flags').cast(pl.Float64),
                    price_flags=pl.col('price_flags').cast(pl.Float64),
                    volume_type=pl.col('volume_type').cast(pl.Float64),
                    records_back=pl.col('records_back').cast(pl.Float64),
                    bid_condition=pl.col('bid_condition').cast(pl.Float64),
                    bid_size=pl.col('bid_size').cast(pl.Float64),
                    bid_exchange=pl.col('bid_exchange').cast(pl.Float64),
                    ask_exchange=pl.col('ask_exchange').cast(pl.Float64),
                    ask_size=pl.col('ask_size').cast(pl.Float64),
                    ask_condition=pl.col('ask_condition').cast(pl.Float64),
                    volume_on_day=pl.col('size').cumsum().cast(pl.Float64),
                    mid=(pl.col('bid') + pl.col('ask')) / 2.0,
                    date=pl.col('date').cast(pl.Utf8)
                )
                f = f.with_columns(
                    flag=pl.col('symbol').apply(lambda x: 1.0 if x[-1] == 'C' else -1.0),
                    timestamp=pl.col('date').str.to_datetime("%Y%m%d").dt.timestamp().cast(pl.Float64)/1000 + pl.col('ms_of_day').cast(pl.Float64),
                    side=pl.struct(['price', 'bid', 'mid', 'ask', 'condition']).apply(lambda x: self.side_of_market(x['price'], x['bid'], x['mid'], x['ask']) * (-1.0 if x['condition'] == 40.0 else 1.0)),
                    oi=pl.col('date').apply(lambda x: oi_map[symbol][x] if (symbol in oi_map and x in oi_map[symbol]) else 0.0)
                )
                f = f.with_columns(
                    flow=(pl.col('size')*pl.col('side')*pl.col('flag')).fill_null(0),
                    timestamp=pl.from_epoch(pl.col('timestamp')/1000, time_unit="s"),
                )
                    
                f = f.with_columns(
                    cumulative_flow=pl.col('flow').cumsum()
                )
#                 if symbol == '2024-06-21 20000C':
#                     print(oi_map['2024-06-21 20000C'])
#                     print(f)

                contracts.append(f)
        if len(contracts) <= 0:
            return pl.DataFrame([], schema=schema)
        print(oi_map)
        df = pl.concat(contracts, how="vertical")
        df = df.with_columns(
            edge=100.0 * pl.col('size') * (pl.col('price') - pl.col('mid')) * pl.col('flag'),
            premium=100.0 * pl.col('size') * pl.col('price') * pl.col('side') * pl.col('flag')
        )
        df = df.with_columns(
            sec=pl.col('ms_of_day') / 1000.0,
            call_premium=pl.col('premium') * pl.when(pl.col("flag") == 1.0).then(1.0).otherwise(0.0),
            put_premium=pl.col('premium') * pl.when(pl.col("flag") == -1.0).then(1.0).otherwise(0.0)
        )
        
        return df
    
    async def get_all_trades(self, underlying, start_date, end_date):
        expiration_dates = await self.get_expirations(underlying)
        expiration_dates = list(filter(lambda x: str(x) >= end_date, expiration_dates))
        trades = [self.get_bulk_trades_for_expiration(underlying, start_date, end_date, exp, pl.DataFrame([])) for exp in expiration_dates]
        trades = await asyncio.gather(*trades)
        trades = filter(lambda x: len(x) > 0, trades)
        df = pl.concat(trades, how="vertical")
        df = df.sort(['date', 'ms_of_day'], descending=[False, False])
        df = df.with_columns(
            expiration=pl.col('symbol').apply(lambda x: x.split(' ')[0]),
            strike=pl.col('symbol').apply(lambda x: x.split(' ')[1])
        ) 
        return df
Editor is loading...
Leave a Comment