DEXSCREENER_CLONE
unknown
python
8 days ago
7.3 kB
7
No Index
import os import requests import streamlit as st import pandas as pd import plotly.graph_objects as go BASE_URL = "https://api.coingecko.com/api/v3/" API_KEY = "YOUR-API-KEY" # set via env var in production def fetch_api(endpoint, params=None): """Helper to fetch from CoinGecko API""" if params is None: params = {} params["x_cg_demo_api_key"] = API_KEY response = requests.get(f"{BASE_URL}/{endpoint}", params=params) if response.status_code != 200: st.error(f"API Error: {response.json().get('error', 'Unknown error')}") return None return response.json() # ---------------- Sidebar Navigation ---------------- # st.sidebar.title("Navigation") networks_data = fetch_api("onchain/networks") selected_network = None selected_dex = None if networks_data: networks = [n["id"] for n in networks_data["data"]] selected_network = st.sidebar.selectbox("Select Network", networks) if selected_network: dexes_data = fetch_api(f"onchain/networks/{selected_network}/dexes") if dexes_data: dexes = [d["id"] for d in dexes_data["data"]] selected_dex = st.sidebar.selectbox("Select DEX", dexes) # ---------------- Main Screen ---------------- # st.title("DEXScreener Clone") # ---------------- Search & Filter Options ---------------- # with st.expander("Search & Filter Options"): min_volume = st.number_input("Min 24h Volume (USD)", min_value=0) min_liquidity = st.number_input("Min Liquidity (USD)", min_value=0) apply_filters = st.button("Apply Filters") st.markdown("---") st.write("### 🌍 Global Search (via CoinGecko API)") global_search_term = st.text_input("Search tokens globally") run_global_search = st.button("Run Global Search") # ---------------- Global Search Results (Outside Expander) ---------------- # if run_global_search and global_search_term: search_results = fetch_api("search", params={"query": global_search_term}) if search_results and "coins" in search_results: coins = search_results["coins"] if coins: search_df = pd.DataFrame(coins)[["id", "name", "symbol", "market_cap_rank", "large"]] st.subheader("Global Search Results") st.dataframe(search_df) else: st.info("No tokens found for that search term.") else: st.warning("Could not fetch search results right now.") # ---------------- Trending Pools Section ---------------- # st.subheader("🔥 Trending Pools Across Networks") trending_data = fetch_api("onchain/networks/trending_pools") if trending_data: trending_pools = trending_data.get("data", []) if trending_pools: trending_df = pd.DataFrame([p["attributes"] for p in trending_pools]) # Normalize nested fields trending_df["volume_usd_24h"] = trending_df["volume_usd"].apply( lambda x: x.get("h24") if isinstance(x, dict) else x ) trending_df["liquidity_usd"] = trending_df["reserve_in_usd"].apply( lambda x: x.get("value") if isinstance(x, dict) else x ) trending_df["volume_usd_24h"] = pd.to_numeric(trending_df["volume_usd_24h"], errors="coerce").fillna(0) trending_df["liquidity_usd"] = pd.to_numeric(trending_df["liquidity_usd"], errors="coerce").fillna(0) st.dataframe( trending_df[["name", "base_token_price_usd", "liquidity_usd", "volume_usd_24h"]].head(10) ) else: st.info("No trending pools found at the moment.") else: st.warning("Could not fetch trending pools right now.") # ---------------- Top-10 Pools for Selected Network + DEX ---------------- # if selected_network and selected_dex: pools_data = fetch_api(f"onchain/networks/{selected_network}/dexes/{selected_dex}/pools") if pools_data: pools = pools_data.get("data", []) df = pd.DataFrame([p["attributes"] for p in pools]) # 🔹 Flatten nested dict fields into numeric columns if "volume_usd" in df.columns: df["volume_usd_24h"] = df["volume_usd"].apply( lambda x: x.get("h24") if isinstance(x, dict) else x ) else: df["volume_usd_24h"] = 0 if "reserve_in_usd" in df.columns: df["liquidity_usd"] = df["reserve_in_usd"].apply( lambda x: x.get("value") if isinstance(x, dict) else x ) else: df["liquidity_usd"] = 0 # ✅ Convert to numeric df["volume_usd_24h"] = pd.to_numeric(df["volume_usd_24h"], errors="coerce").fillna(0) df["liquidity_usd"] = pd.to_numeric(df["liquidity_usd"], errors="coerce").fillna(0) # Apply filters if set if apply_filters: df = df[ (df["volume_usd_24h"] >= min_volume) & (df["liquidity_usd"] >= min_liquidity) ] st.subheader("Top-10 Tokens & Stats") top_df = df[["name", "base_token_price_usd", "liquidity_usd", "volume_usd_24h"]].head(10) st.dataframe(top_df) # Select token token_choice = st.selectbox("Select a token to view details", top_df["name"]) token_row = df[df["name"] == token_choice].iloc[0] # ---------------- Token Detail View ---------------- # st.header(token_choice) pool_address = token_row["address"] # Fetch OHLCV Data ohlcv_data = fetch_api( f"onchain/networks/{selected_network}/pools/{pool_address}/ohlcv/day" ) if ohlcv_data: ohlcv = ohlcv_data["data"]["attributes"]["ohlcv_list"] ohlcv_df = pd.DataFrame( ohlcv, columns=["timestamp", "open", "high", "low", "close", "volume"] ) ohlcv_df["date"] = pd.to_datetime(ohlcv_df["timestamp"], unit="s") # Candlestick + Volume subplot fig = go.Figure() fig.add_trace(go.Candlestick( x=ohlcv_df["date"], open=ohlcv_df["open"], high=ohlcv_df["high"], low=ohlcv_df["low"], close=ohlcv_df["close"], name="Price" )) fig.add_trace(go.Bar( x=ohlcv_df["date"], y=ohlcv_df["volume"], name="Volume", marker_color="lightblue", opacity=0.5, yaxis="y2" )) fig.update_layout( title=f"{token_choice} - OHLCV Chart", xaxis=dict(title="Date", rangeslider=dict(visible=False)), yaxis=dict(title="Price (USD)"), yaxis2=dict( title="Volume", overlaying="y", side="right", showgrid=False ), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), height=600, ) st.plotly_chart(fig, use_container_width=True) # Basic Stats st.subheader("Basic Stats") st.write(f"**Price (USD):** {token_row['base_token_price_usd']}") st.write(f"**24h Volume (USD):** {token_row['volume_usd']}") st.write(f"**Liquidity (USD):** {token_row['reserve_in_usd']}")
Editor is loading...