DEXSCREENER_CLONE
unknown
python
2 months ago
7.3 kB
24
No Index
import os
import requests
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
BASE_URL = "https://api.coingecko.com/api/v3/"
API_KEY = "YOUR-API-KEY" # set via env var in production
def fetch_api(endpoint, params=None):
"""Helper to fetch from CoinGecko API"""
if params is None:
params = {}
params["x_cg_demo_api_key"] = API_KEY
response = requests.get(f"{BASE_URL}/{endpoint}", params=params)
if response.status_code != 200:
st.error(f"API Error: {response.json().get('error', 'Unknown error')}")
return None
return response.json()
# ---------------- Sidebar Navigation ---------------- #
st.sidebar.title("Navigation")
networks_data = fetch_api("onchain/networks")
selected_network = None
selected_dex = None
if networks_data:
networks = [n["id"] for n in networks_data["data"]]
selected_network = st.sidebar.selectbox("Select Network", networks)
if selected_network:
dexes_data = fetch_api(f"onchain/networks/{selected_network}/dexes")
if dexes_data:
dexes = [d["id"] for d in dexes_data["data"]]
selected_dex = st.sidebar.selectbox("Select DEX", dexes)
# ---------------- Main Screen ---------------- #
st.title("DEXScreener Clone")
# ---------------- Search & Filter Options ---------------- #
with st.expander("Search & Filter Options"):
min_volume = st.number_input("Min 24h Volume (USD)", min_value=0)
min_liquidity = st.number_input("Min Liquidity (USD)", min_value=0)
apply_filters = st.button("Apply Filters")
st.markdown("---")
st.write("### 🌍 Global Search (via CoinGecko API)")
global_search_term = st.text_input("Search tokens globally")
run_global_search = st.button("Run Global Search")
# ---------------- Global Search Results (Outside Expander) ---------------- #
if run_global_search and global_search_term:
search_results = fetch_api("search", params={"query": global_search_term})
if search_results and "coins" in search_results:
coins = search_results["coins"]
if coins:
search_df = pd.DataFrame(coins)[["id", "name", "symbol", "market_cap_rank", "large"]]
st.subheader("Global Search Results")
st.dataframe(search_df)
else:
st.info("No tokens found for that search term.")
else:
st.warning("Could not fetch search results right now.")
# ---------------- Trending Pools Section ---------------- #
st.subheader("🔥 Trending Pools Across Networks")
trending_data = fetch_api("onchain/networks/trending_pools")
if trending_data:
trending_pools = trending_data.get("data", [])
if trending_pools:
trending_df = pd.DataFrame([p["attributes"] for p in trending_pools])
# Normalize nested fields
trending_df["volume_usd_24h"] = trending_df["volume_usd"].apply(
lambda x: x.get("h24") if isinstance(x, dict) else x
)
trending_df["liquidity_usd"] = trending_df["reserve_in_usd"].apply(
lambda x: x.get("value") if isinstance(x, dict) else x
)
trending_df["volume_usd_24h"] = pd.to_numeric(trending_df["volume_usd_24h"], errors="coerce").fillna(0)
trending_df["liquidity_usd"] = pd.to_numeric(trending_df["liquidity_usd"], errors="coerce").fillna(0)
st.dataframe(
trending_df[["name", "base_token_price_usd", "liquidity_usd", "volume_usd_24h"]].head(10)
)
else:
st.info("No trending pools found at the moment.")
else:
st.warning("Could not fetch trending pools right now.")
# ---------------- Top-10 Pools for Selected Network + DEX ---------------- #
if selected_network and selected_dex:
pools_data = fetch_api(f"onchain/networks/{selected_network}/dexes/{selected_dex}/pools")
if pools_data:
pools = pools_data.get("data", [])
df = pd.DataFrame([p["attributes"] for p in pools])
# 🔹 Flatten nested dict fields into numeric columns
if "volume_usd" in df.columns:
df["volume_usd_24h"] = df["volume_usd"].apply(
lambda x: x.get("h24") if isinstance(x, dict) else x
)
else:
df["volume_usd_24h"] = 0
if "reserve_in_usd" in df.columns:
df["liquidity_usd"] = df["reserve_in_usd"].apply(
lambda x: x.get("value") if isinstance(x, dict) else x
)
else:
df["liquidity_usd"] = 0
# ✅ Convert to numeric
df["volume_usd_24h"] = pd.to_numeric(df["volume_usd_24h"], errors="coerce").fillna(0)
df["liquidity_usd"] = pd.to_numeric(df["liquidity_usd"], errors="coerce").fillna(0)
# Apply filters if set
if apply_filters:
df = df[
(df["volume_usd_24h"] >= min_volume) &
(df["liquidity_usd"] >= min_liquidity)
]
st.subheader("Top-10 Tokens & Stats")
top_df = df[["name", "base_token_price_usd", "liquidity_usd", "volume_usd_24h"]].head(10)
st.dataframe(top_df)
# Select token
token_choice = st.selectbox("Select a token to view details", top_df["name"])
token_row = df[df["name"] == token_choice].iloc[0]
# ---------------- Token Detail View ---------------- #
st.header(token_choice)
pool_address = token_row["address"]
# Fetch OHLCV Data
ohlcv_data = fetch_api(
f"onchain/networks/{selected_network}/pools/{pool_address}/ohlcv/day"
)
if ohlcv_data:
ohlcv = ohlcv_data["data"]["attributes"]["ohlcv_list"]
ohlcv_df = pd.DataFrame(
ohlcv,
columns=["timestamp", "open", "high", "low", "close", "volume"]
)
ohlcv_df["date"] = pd.to_datetime(ohlcv_df["timestamp"], unit="s")
# Candlestick + Volume subplot
fig = go.Figure()
fig.add_trace(go.Candlestick(
x=ohlcv_df["date"],
open=ohlcv_df["open"],
high=ohlcv_df["high"],
low=ohlcv_df["low"],
close=ohlcv_df["close"],
name="Price"
))
fig.add_trace(go.Bar(
x=ohlcv_df["date"],
y=ohlcv_df["volume"],
name="Volume",
marker_color="lightblue",
opacity=0.5,
yaxis="y2"
))
fig.update_layout(
title=f"{token_choice} - OHLCV Chart",
xaxis=dict(title="Date", rangeslider=dict(visible=False)),
yaxis=dict(title="Price (USD)"),
yaxis2=dict(
title="Volume",
overlaying="y",
side="right",
showgrid=False
),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
height=600,
)
st.plotly_chart(fig, use_container_width=True)
# Basic Stats
st.subheader("Basic Stats")
st.write(f"**Price (USD):** {token_row['base_token_price_usd']}")
st.write(f"**24h Volume (USD):** {token_row['volume_usd']}")
st.write(f"**Liquidity (USD):** {token_row['reserve_in_usd']}")
Editor is loading...