Untitled
unknown
python
9 months ago
3.2 kB
5
Indexable
import asyncio
import json
import os
from datetime import datetime, timezone
import pytz
from websockets import connect
from termcolor import cprint
# list of symbols you want to track
symbols = [ "btcusdt", "ethusdt","solusdt", "bnbusdt", "dogeusdt"]
websocket_url_base = "wss://fstream.binance.com/ws/"
trades_filename = "binance_trades.csv"
#check if the csv file exist
if not os.path.isfile(trades_filename):
with open(trades_filename, "w") as f:
f.write("Event Time, Symbol, Aggregate Trade ID, Price, Quantity, First Trade ID, Is Buyer Maker\n")
class TradeAggregator:
def __init__(self):
self.trade_buckets= {}
async def add_trade(self,symbol, second, usd_size, is_buyer_make):
trade_key = (symbol,second,is_buyer_make)
self.trade_buckets[trade_key] = self.trade_buckets.get(trade_key, 0) + usd_size
async def check_and_print_trade(self):
timestamp_now = datetime.now(timezone.utc).strftime("%H:%M:%S")
deletetions = []
for trade_key, usd_size in self.trade_buckets.items():
symbol,second, is_buyer_maker = trade_key
if second < timestamp_now and usd_size > 499999:
attrs= ["bold"]
back_color = "on_blue" if not is_buyer_maker else "on_magenta"
trade_type = "BUY" if not is_buyer_maker else "SELL"
if usd_size > 2999999:
usd_size = usd_size / 1000000
cprint (f"\033[5m{trade_key} {symbol} {second} ${usd_size:.2f}m\033[0m","white", back_color, attrs= attrs)
else:
usd_size = usd_size / 1000000
cprint(f"{trade_type} {symbol} {second} ${usd_size:.2f}m", "white", back_color, attrs=attrs)
deletetions.append(trade_key)
for key in deletetions:
del self.trade_buckets[key]
trade_aggregator = TradeAggregator()
async def binance_trade_stream (uri, symbol, filename, aggregator):
async with connect(uri) as websocket:
while True:
try:
message = await websocket.recv()
data = json.loads(message)
usd_size = float(data["p"] * float(data["q"]))
trade_time = datetime.fromtimestamp(data["T"]/1000), pytz.timezone("Europe/Istanbul")
readeable_trade_time = trade_time.strftime("%H:%M:%S")
await aggregator.add_trade(symbol.upper().replace("USDT", ""), readeable_trade_time, usd_size, data["m"] )
except:
await asyncio.sleep(5)
async def print_aggregated_trades_every_second(aggregator):
while True:
await asyncio.sleep(1)
await aggregator.check_and_print_trade()
async def main():
filename = "binance_trades_big.csv"
trade_stream_tasks = [binance_trade_stream(f"{websocket_url_base}{symbol}@aggTrade", symbol, filename, trade_aggregator) for symbol in symbols ]
print_task = asyncio.create_task(print_aggregated_trades_every_second(trade_aggregator))
await asyncio.gather(*trade_stream_tasks, print_task)
asyncio. run (main())
Editor is loading...
Leave a Comment