Untitled

 avatar
unknown
python
20 days ago
3.2 kB
3
Indexable
import asyncio
import json
import os
from datetime import datetime, timezone
import pytz
from websockets import connect
from termcolor import cprint


# list of symbols you want to track
symbols = [ "btcusdt", "ethusdt","solusdt", "bnbusdt", "dogeusdt"]
websocket_url_base = "wss://fstream.binance.com/ws/"
trades_filename = "binance_trades.csv"

#check if the csv file exist

if not os.path.isfile(trades_filename):
    with open(trades_filename, "w") as f:
        f.write("Event Time, Symbol, Aggregate Trade ID, Price, Quantity, First Trade ID, Is Buyer Maker\n")

class TradeAggregator:
    def __init__(self):
        self.trade_buckets= {}

    async def add_trade(self,symbol, second, usd_size, is_buyer_make):
        trade_key = (symbol,second,is_buyer_make)
        self.trade_buckets[trade_key] = self.trade_buckets.get(trade_key, 0) + usd_size

    async def check_and_print_trade(self):
        timestamp_now = datetime.now(timezone.utc).strftime("%H:%M:%S")
        deletetions = []
        for trade_key, usd_size in self.trade_buckets.items():
            symbol,second, is_buyer_maker = trade_key
            if second < timestamp_now and usd_size > 499999:
                attrs= ["bold"]
                back_color = "on_blue" if not is_buyer_maker else "on_magenta"
                trade_type = "BUY" if not is_buyer_maker else "SELL"
                if usd_size > 2999999:
                    usd_size = usd_size / 1000000
                    cprint (f"\033[5m{trade_key} {symbol} {second} ${usd_size:.2f}m\033[0m","white", back_color, attrs= attrs)
                else:
                    usd_size = usd_size / 1000000
                    cprint(f"{trade_type} {symbol} {second} ${usd_size:.2f}m", "white", back_color, attrs=attrs)
                deletetions.append(trade_key)
        for key in deletetions:
            del self.trade_buckets[key]

trade_aggregator = TradeAggregator()

async def binance_trade_stream (uri, symbol, filename, aggregator):
     async with connect(uri) as websocket:
         while True:
             try:
                 message = await websocket.recv()
                 data = json.loads(message)
                 usd_size = float(data["p"] * float(data["q"]))
                 trade_time = datetime.fromtimestamp(data["T"]/1000), pytz.timezone("Europe/Istanbul")
                 readeable_trade_time = trade_time.strftime("%H:%M:%S")

                 await aggregator.add_trade(symbol.upper().replace("USDT", ""), readeable_trade_time, usd_size, data["m"]  )
             except:
                 await asyncio.sleep(5)

async def print_aggregated_trades_every_second(aggregator):
    while True:
        await asyncio.sleep(1)
        await aggregator.check_and_print_trade()

async def main():
    filename = "binance_trades_big.csv"
    trade_stream_tasks = [binance_trade_stream(f"{websocket_url_base}{symbol}@aggTrade", symbol, filename, trade_aggregator) for symbol in symbols ]
    print_task = asyncio.create_task(print_aggregated_trades_every_second(trade_aggregator))
    await asyncio.gather(*trade_stream_tasks, print_task)

asyncio. run (main())
Leave a Comment