Untitled
unknown
plain_text
2 years ago
26 kB
8
Indexable
import tkinter as tk
import threading
import logging
import time
from PIL import ImageTk, Image
import sqlite3
import requests
from web3 import Web3, HTTPProvider
from web3.middleware import geth_poa_middleware
from decimal import Decimal
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from uniswap import Uniswap
import json
from tkinter import ttk
from concurrent.futures import ThreadPoolExecutor
# Set up the window
window = tk.Tk()
window.title("Token Monitor")
window.geometry("800x600")
# Load and display logo image
logo_img = ImageTk.PhotoImage(Image.open("logo.png"))
logo_label = tk.Label(window, image=logo_img)
logo_label.pack(pady=20)
# Input fields
ethplorer_api_key_entry = tk.Entry(window, width=40)
ethplorer_api_key_entry.insert(0, "Enter your Ethplorer API key")
ethplorer_api_key_entry.pack(pady=10)
etherscan_api_key_entry = tk.Entry(window, width=40)
etherscan_api_key_entry.insert(0, "Enter your Etherscan API key")
etherscan_api_key_entry.pack(pady=10)
infura_project_id_entry = tk.Entry(window, width=40)
infura_project_id_entry.insert(0, "Enter your Infura project ID")
infura_project_id_entry.pack(pady=10)
gmail_user_entry = tk.Entry(window, width=40)
gmail_user_entry.insert(0, "Enter your Gmail address")
gmail_user_entry.pack(pady=10)
gmail_password_entry = tk.Entry(window, width=40, show="*")
gmail_password_entry.insert(0, "Enter your Gmail password")
gmail_password_entry.pack(pady=10)
metamask_account_entry = tk.Entry(window, width=40)
metamask_account_entry.insert(0, "Enter your MetaMask account address")
metamask_account_entry.pack(pady=10)
private_key_entry = tk.Entry(window, width=40, show="*")
private_key_entry.insert(0, "Enter your MetaMask private key")
private_key_entry.pack(pady=10)
threshold_entry = tk.Entry(window, width=40)
threshold_entry.insert(0, "Enter threshold percentage")
threshold_entry.pack(pady=10)
# Log display
log_text = tk.Text(window, height=10, width=80)
log_text.pack(pady=20)
# Watchlist Treeview
watchlist_frame = tk.Frame(window)
watchlist_frame.pack(pady=10)
watchlist_treeview = ttk.Treeview(watchlist_frame, columns=("Price", "Threshold", "Manual Sell"))
watchlist_treeview.heading("#0", text="Token")
watchlist_treeview.heading("Price", text="Price")
watchlist_treeview.heading("Threshold", text="Threshold")
watchlist_treeview.heading("Manual Sell", text="Manual Sell")
watchlist_treeview.pack(side="left")
treeview_scrollbar = ttk.Scrollbar(watchlist_frame, orient="vertical", command=watchlist_treeview.yview)
treeview_scrollbar.pack(side="right", fill="y")
watchlist_treeview.configure(yscrollcommand=treeview_scrollbar.set)
# Configure logging
log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
log_handler = logging.FileHandler('token_monitor.log')
log_handler.setFormatter(log_formatter)
log_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(log_handler)
def update_log(log):
log_text.configure(state='normal')
log_text.insert(tk.END, log + "\n")
log_text.configure(state='disabled')
log_text.see(tk.END)
logging.info(log)
def update_watchlist(token, price, threshold):
threshold_price = calculate_threshold_price(float(price), threshold_percentage)
watchlist_treeview.insert("", tk.END, text=token, values=(price, threshold_price, ""))
def update_manual_sell(token):
watchlist_treeview.set(token, "Manual Sell", "SELL")
def init():
global w3, uniswap_router_v2, uniswap_router_v3
# Initialize Web3
infura_project_id = infura_project_id_entry.get()
w3 = Web3(HTTPProvider(f'https://mainnet.infura.io/v3/{infura_project_id}'))
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
# Initialize Uniswap Routers
uniswap_router_v2_address = '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D' # Uniswap V2 Router address
uniswap_router_v3_address = '0xE592427A0AEce92De3Edee1F18E0157C05861564' # Uniswap V3 Router address
uniswap_router_v2_abi = [{"inputs":[{"internalType":"address","name":"_factory","type":"address"},{"internalType":"address","name":"_WETH","type":"address"}],"stateMutability":"nonpayable","type":"constructor"},{"inputs":[],"name":"WETH","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"}]
uniswap_router_v3_abi = [{"inputs":[{"internalType":"address","name":"_factory","type":"address"},{"internalType":"address","name":"_WETH9","type":"address"}],"stateMutability":"nonpayable","type":"constructor"},{"inputs":[],"name":"WETH9","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"}]
uniswap_router_v2 = w3.eth.contract(address=uniswap_router_v2_address, abi=uniswap_router_v2_abi)
uniswap_router_v3 = w3.eth.contract(address=uniswap_router_v3_address, abi=uniswap_router_v3_abi)
# Start the UI
window.mainloop()
# Start tracking tokens
def start_tracking():
ethplorer_api_key = ethplorer_api_key_entry.get()
etherscan_api_key = etherscan_api_key_entry.get()
infura_project_id = infura_project_id_entry.get()
gmail_user = gmail_user_entry.get()
gmail_password = gmail_password_entry.get()
metamask_account = metamask_account_entry.get()
private_key = private_key_entry.get()
threshold_percentage = float(threshold_entry.get())
# Validate API keys and private key
if not (ethplorer_api_key and etherscan_api_key and infura_project_id and gmail_user and gmail_password and metamask_account and private_key):
update_log("Please fill in all the required fields.")
return
# Create database connection
with sqlite3.connect('tokens.db') as conn:
c = conn.cursor()
c.execute("""CREATE TABLE IF NOT EXISTS tokens (
token_address TEXT,
purchase_price REAL,
purchase_transaction_hash TEXT
)""")
def get_abi(token_address):
standard_erc20_abi = [
{
"constant": True,
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "balance", "type": "uint256"}],
"type": "function"
},
{
"constant": True,
"inputs": [
{"name": "_owner", "type": "address"},
{"name": "_spender", "type": "address"}
],
"name": "allowance",
"outputs": [{"name": "", "type": "uint256"}],
"type": "function"
},
{
"constant": False,
"inputs": [
{"name": "_spender", "type": "address"},
{"name": "_value", "type": "uint256"}
],
"name": "approve",
"outputs": [{"name": "", "type": "bool"}],
"type": "function"
}
]
try:
# Try to fetch the specific ABI from Etherscan
response = requests.get(f"https://api.etherscan.io/api?module=contract&action=getabi&address={token_address}")
response.raise_for_status()
data = response.json()
return json.loads(data['result'])
except:
# If that fails, fall back to the standard ERC20 ABI
return standard_erc20_abi
def init_web3():
infura_project_id = infura_project_id_entry.get()
provider_url = f"https://mainnet.infura.io/v3/{infura_project_id}"
w3 = Web3(HTTPProvider(provider_url))
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
return w3
w3 = init_web3()
def get_token_info(token):
try:
c.execute("SELECT token_address, purchase_price FROM tokens WHERE token_address = ?", (token,))
result = c.fetchone()
if result:
return result[0], result[1]
else:
return None, None
except sqlite3.Error as e:
logging.error(f'Error retrieving token information from database: {str(e)}')
return None, None
def is_token_in_database(token):
token_address, _ = get_token_info(token)
return token_address is not None
def get_purchase_price_from_database(token):
_, purchase_price = get_token_info(token)
return purchase_price
def get_current_prices(tokens):
try:
token_list = ','.join(tokens)
response = requests.get(f'https://api.ethplorer.io/getTokenInfo/{token_list}?apiKey={ethplorer_api_key}')
response.raise_for_status()
data = response.json()
prices = {}
for token in data:
token_address = token['address']
current_price = float(token['price']['rate'])
prices[token_address] = current_price
return prices
except requests.exceptions.RequestException as e:
logging.error(f'Error retrieving current prices: {str(e)}')
return {}
def calculate_threshold_price(purchase_price, threshold_percentage):
return purchase_price * (1 + threshold_percentage / 100)
def track_wallet():
global stop_threads
token_cache = {}
while not stop_threads:
try:
current_tokens = get_current_tokens(metamask_account)
for token in current_tokens:
if not is_token_in_database(token):
if token not in token_cache:
token_cache[token] = get_current_price(token)
current_price = token_cache[token]
if current_price is not None:
purchase_price = get_purchase_price_from_database(token)
threshold_price = calculate_threshold_price(purchase_price, threshold_percentage)
if current_price >= threshold_price and get_gas_price() <= AVERAGE_GAS_PRICE:
gas_spent, profit = sell_token(token)
if gas_spent is not None and profit is not None:
remove_from_database(token)
send_email(token, profit, gas_spent)
stop_price_checking_thread(token) # Stop tracking thread after selling and converting to ETH
time.sleep(60)
except Exception as e:
logging.error(f'An error occurred while tracking the wallet: {str(e)}')
def get_current_tokens(account):
try:
response = requests.get(f'https://api.ethplorer.io/getAddressInfo/{account}?apiKey={ethplorer_api_key}')
response.raise_for_status()
data = response.json()
tokens = data['tokens']
return [token['tokenInfo']['address'] for token in tokens]
except requests.exceptions.RequestException as e:
logging.error(f'Error retrieving current tokens: {str(e)}')
return []
def get_current_price(token):
try:
response = requests.get(f'https://api.ethplorer.io/getTokenInfo/{token}?apiKey={ethplorer_api_key}')
response.raise_for_status()
data = response.json()
return float(data['price']['rate'])
except requests.exceptions.RequestException as e:
logging.error(f'Error retrieving current price for {token}: {str(e)}')
return None
def get_gas_price():
try:
response = requests.get(f'https://api.ethplorer.io/getGasPrice?apiKey={ethplorer_api_key}')
response.raise_for_status()
data = response.json()
return float(data['safeLow']) / 10**9 # Convert from Wei to Gwei
except requests.exceptions.RequestException as e:
logging.error(f'Error retrieving gas price: {str(e)}')
return None
def get_uniswap_version_with_more_liquidity(token):
try:
uniswap_v2 = Uniswap(private_key, w3, version=2)
uniswap_v3 = Uniswap(private_key, w3, version=3)
token_address = Web3.toChecksumAddress(token)
token_contract = w3.eth.contract(address=token_address, abi=get_abi(token_address))
liquidity_v2 = uniswap_v2.get_liquidity(token_contract)
liquidity_v3 = uniswap_v3.get_liquidity(token_contract)
if liquidity_v2 > liquidity_v3:
return 2
elif liquidity_v3 > liquidity_v2:
return 3
else:
return None
except Exception as e:
logging.error(f'An error occurred while checking Uniswap version with more liquidity for {token}: {str(e)}')
return None
def sell_token(token):
try:
uniswap_version = get_uniswap_version_with_more_liquidity(token)
if uniswap_version == 2:
uniswap_v2 = Uniswap(private_key, w3, version=2)
uniswap_router_address = '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D' # Uniswap V2 Router address
uniswap = uniswap_v2
elif uniswap_version == 3:
uniswap_v3 = Uniswap(private_key, w3, version=3)
uniswap_router_address = '0xE592427A0AEce92De3Edee1F18E0157C05861564' # Uniswap V3 Router address
uniswap = uniswap_v3
else:
logging.info(f'No Uniswap version with more liquidity found for {token}.')
return None, None
token_address = Web3.toChecksumAddress(token)
token_contract = w3.eth.contract(address=token_address, abi=get_abi(token_address))
allowance = token_contract.functions.allowance(metamask_account, uniswap_router_address).call()
if allowance == 0:
gas_price = w3.eth.gas_price
nonce = w3.eth.getTransactionCount(metamask_account)
txn_dict = token_contract.functions.approve(uniswap_router_address, Web3.toWei(1e18, 'ether')).buildTransaction({
'chainId': 1,
'gas': 50000,
'gasPrice': gas_price,
'nonce': nonce,
})
signed_txn = w3.eth.account.signTransaction(txn_dict, private_key=private_key)
txn_hash = w3.eth.sendRawTransaction(signed_txn.rawTransaction)
txn_receipt = w3.eth.waitForTransactionReceipt(txn_hash)
logging.info("Approved Uniswap to spend tokens\n" + str(txn_receipt))
balance = token_contract.functions.balanceOf(metamask_account).call()
if balance == 0:
logging.info(f"No {token} tokens to sell.")
return None, None
path = [w3.toChecksumAddress(token), w3.toChecksumAddress(w3.eth.default_account)]
liquidity = uniswap.get_amounts_out(1, path)
if liquidity[1] == 0:
logging.info(f"No liquidity for {token} in Uniswap pool.")
return None, None
# Call ethgasstation API for dynamic gas pricing
response = requests.get("https://ethgasstation.info/api/ethgasAPI.json")
data = response.json()
gas_price = int(data['average']) * 10**8 # Convert gwei to wei
nonce = w3.eth.getTransactionCount(metamask_account)
txn_dict = uniswap_router.functions.swapExactTokensForETHSupportingFeeOnTransferTokens(
balance, # amountIn
0, # amountOutMin, set to 0 to accept any amount
[token_address, w3.toChecksumAddress(w3.eth.default_account)], # path
metamask_account, # recipient
int(time.time()) + 10 * 60 # deadline
).buildTransaction({
'chainId': 1,
'gas': 500000,
'gasPrice': gas_price,
'nonce': nonce,
})
signed_txn = w3.eth.account.signTransaction(txn_dict, private_key=private_key)
txn_hash = w3.eth.sendRawTransaction(signed_txn.rawTransaction)
txn_receipt = w3.eth.waitForTransactionReceipt(txn_hash)
logging.info(f"Sold {balance} {token} tokens\n" + str(txn_receipt))
gas_spent = txn_receipt['gasUsed'] * w3.fromWei(txn_dict['gasPrice'], 'gwei')
profit = w3.fromWei(txn_receipt['cumulativeGasUsed'], 'ether') - get_purchase_price_from_database(token)
return gas_spent, profit
except Exception as e:
logging.error(f'An error occurred while selling token {token}: {str(e)}')
return None, None
def remove_from_database(token):
try:
c.execute("DELETE FROM tokens WHERE token_address = ?", (token,))
conn.commit()
logging.info(f'Token {token} removed from the database.')
except sqlite3.Error as e:
logging.error(f'Error removing token from database: {str(e)}')
def send_email(token, profit, gas_spent):
try:
msg = MIMEMultipart()
msg['From'] = gmail_user
msg['To'] = gmail_user
msg['Subject'] = 'Token Sold'
body = f'Token {token} has been sold. Profit: {profit} ETH. Gas spent: {gas_spent} Gwei.'
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(gmail_user, gmail_password)
text = msg.as_string()
server.sendmail(gmail_user, gmail_user, text)
server.quit()
logging.info('Email notification sent.')
except smtplib.SMTPException as e:
logging.error(f'Error sending email notification: {str(e)}')
def show_wallet_balance():
try:
response = requests.get(f'https://api.ethplorer.io/getAddressInfo/{metamask_account}?apiKey={ethplorer_api_key}')
response.raise_for_status()
data = response.json()
balance = float(data['ETH']['balance']) / 10**18
return balance
except requests.exceptions.RequestException as e:
logging.error(f'Error retrieving wallet balance: {str(e)}')
return None
def clear_log():
log_text.configure(state='normal')
log_text.delete("1.0", tk.END)
log_text.configure(state='disabled')
def stop_tracking():
global stop_threads
stop_threads = True
logging.info('Token tracking stopped.')
def stop_price_checking_thread(token):
global threads
if token in threads:
threads[token] = False
logging.info(f'Price checking stopped for token {token}.')
def manual_sell(token):
response = tk.messagebox.askyesno("Manual Sell", f"Do you want to manually sell {token}?")
if response == tk.YES:
gas_spent, profit = sell_token(token)
if gas_spent is not None and profit is not None:
remove_from_database(token)
send_email(token, profit, gas_spent)
stop_price_checking_thread(token) # Stop tracking thread after selling and converting to ETH
update_log(f"Token {token} manually sold. Profit: {profit} ETH. Gas spent: {gas_spent} Gwei.")
def add_token_to_watchlist():
token = token_entry.get().strip()
if token:
update_watchlist(token, "", "")
token_entry.delete(0, tk.END)
def remove_token_from_watchlist():
selected_item = watchlist_treeview.selection()
if selected_item:
token = watchlist_treeview.item(selected_item)['text']
watchlist_treeview.delete(selected_item)
stop_price_checking_thread(token)
logging.info(f'Token {token} removed from watchlist.')
def update_threshold():
new_threshold_percentage = float(threshold_entry.get())
for child in watchlist_treeview.get_children():
current_threshold = watchlist_treeview.item(child)['values'][1]
if current_threshold != "":
current_threshold_price = float(current_threshold)
new_threshold_price = current_threshold_price * (1 + new_threshold_percentage / 100)
watchlist_treeview.set(child, "Threshold", f"{new_threshold_price:.6f}")
def update_manual_sell_button():
selected_item = watchlist_treeview.selection()
if selected_item:
token = watchlist_treeview.item(selected_item)['text']
watchlist_treeview.set(selected_item, "Manual Sell", "SELL")
update_manual_sell(token)
def create_menu(event):
selected_item = watchlist_treeview.selection()
if selected_item:
token = watchlist_treeview.item(selected_item)['text']
menu.post(event.x_root, event.y_root)
def copy_address():
selected_item = watchlist_treeview.selection()
if selected_item:
token = watchlist_treeview.item(selected_item)['text']
window.clipboard_clear()
window.clipboard_append(token)
logging.info(f'Token {token} address copied to clipboard.')
def copy_price():
selected_item = watchlist_treeview.selection()
if selected_item:
price = watchlist_treeview.item(selected_item)['values'][0]
window.clipboard_clear()
window.clipboard_append(price)
logging.info(f'Price {price} copied to clipboard.')
def copy_threshold():
selected_item = watchlist_treeview.selection()
if selected_item:
threshold = watchlist_treeview.item(selected_item)['values'][1]
window.clipboard_clear()
window.clipboard_append(threshold)
logging.info(f'Threshold {threshold} copied to clipboard.')
def copy_manual_sell():
selected_item = watchlist_treeview.selection()
if selected_item:
manual_sell = watchlist_treeview.item(selected_item)['values'][2]
window.clipboard_clear()
window.clipboard_append(manual_sell)
logging.info(f'Manual Sell {manual_sell} copied to clipboard.')
# Add token entry
token_entry = tk.Entry(window, width=40)
token_entry.pack(pady=10)
# Add token button
add_token_button = tk.Button(window, text="Add Token", command=add_token_to_watchlist)
add_token_button.pack(pady=5)
# Remove token button
remove_token_button = tk.Button(window, text="Remove Token", command=remove_token_from_watchlist)
remove_token_button.pack(pady=5)
# Threshold update button
threshold_update_button = tk.Button(window, text="Update Threshold", command=update_threshold)
threshold_update_button.pack(pady=5)
# Manual sell button
manual_sell_button = tk.Button(window, text="Manual Sell", command=update_manual_sell_button)
manual_sell_button.pack(pady=5)
# Watchlist right-click menu
menu = tk.Menu(window, tearoff=0)
menu.add_command(label="Copy Token Address", command=copy_address)
menu.add_command(label="Copy Price", command=copy_price)
menu.add_command(label="Copy Threshold", command=copy_threshold)
menu.add_command(label="Copy Manual Sell", command=copy_manual_sell)
# Watchlist Treeview bindings
watchlist_treeview.bind("<Button-3>", create_menu)
# Start tracking button
start_button = tk.Button(window, text="Start Tracking", command=start_tracking)
start_button.pack(pady=10)
# Stop tracking button
stop_button = tk.Button(window, text="Stop Tracking", command=stop_tracking)
stop_button.pack(pady=5)
# Wallet balance button
balance_button = tk.Button(window, text="Wallet Balance", command=show_wallet_balance)
balance_button.pack(pady=5)
# Clear log button
clear_button = tk.Button(window, text="Clear Log", command=clear_log)
clear_button.pack(pady=5)
window.mainloop()
# Clean up database connection
conn.close()
if __name__ == "__main__":
init()Editor is loading...