Untitled
unknown
plain_text
2 years ago
26 kB
7
Indexable
import tkinter as tk import threading import logging import time from PIL import ImageTk, Image import sqlite3 import requests from web3 import Web3, HTTPProvider from web3.middleware import geth_poa_middleware from decimal import Decimal import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from uniswap import Uniswap import json from tkinter import ttk from concurrent.futures import ThreadPoolExecutor # Set up the window window = tk.Tk() window.title("Token Monitor") window.geometry("800x600") # Load and display logo image logo_img = ImageTk.PhotoImage(Image.open("logo.png")) logo_label = tk.Label(window, image=logo_img) logo_label.pack(pady=20) # Input fields ethplorer_api_key_entry = tk.Entry(window, width=40) ethplorer_api_key_entry.insert(0, "Enter your Ethplorer API key") ethplorer_api_key_entry.pack(pady=10) etherscan_api_key_entry = tk.Entry(window, width=40) etherscan_api_key_entry.insert(0, "Enter your Etherscan API key") etherscan_api_key_entry.pack(pady=10) infura_project_id_entry = tk.Entry(window, width=40) infura_project_id_entry.insert(0, "Enter your Infura project ID") infura_project_id_entry.pack(pady=10) gmail_user_entry = tk.Entry(window, width=40) gmail_user_entry.insert(0, "Enter your Gmail address") gmail_user_entry.pack(pady=10) gmail_password_entry = tk.Entry(window, width=40, show="*") gmail_password_entry.insert(0, "Enter your Gmail password") gmail_password_entry.pack(pady=10) metamask_account_entry = tk.Entry(window, width=40) metamask_account_entry.insert(0, "Enter your MetaMask account address") metamask_account_entry.pack(pady=10) private_key_entry = tk.Entry(window, width=40, show="*") private_key_entry.insert(0, "Enter your MetaMask private key") private_key_entry.pack(pady=10) threshold_entry = tk.Entry(window, width=40) threshold_entry.insert(0, "Enter threshold percentage") threshold_entry.pack(pady=10) # Log display log_text = tk.Text(window, height=10, width=80) log_text.pack(pady=20) # Watchlist Treeview watchlist_frame = tk.Frame(window) watchlist_frame.pack(pady=10) watchlist_treeview = ttk.Treeview(watchlist_frame, columns=("Price", "Threshold", "Manual Sell")) watchlist_treeview.heading("#0", text="Token") watchlist_treeview.heading("Price", text="Price") watchlist_treeview.heading("Threshold", text="Threshold") watchlist_treeview.heading("Manual Sell", text="Manual Sell") watchlist_treeview.pack(side="left") treeview_scrollbar = ttk.Scrollbar(watchlist_frame, orient="vertical", command=watchlist_treeview.yview) treeview_scrollbar.pack(side="right", fill="y") watchlist_treeview.configure(yscrollcommand=treeview_scrollbar.set) # Configure logging log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s') log_handler = logging.FileHandler('token_monitor.log') log_handler.setFormatter(log_formatter) log_handler.setLevel(logging.INFO) logging.getLogger().addHandler(log_handler) def update_log(log): log_text.configure(state='normal') log_text.insert(tk.END, log + "\n") log_text.configure(state='disabled') log_text.see(tk.END) logging.info(log) def update_watchlist(token, price, threshold): threshold_price = calculate_threshold_price(float(price), threshold_percentage) watchlist_treeview.insert("", tk.END, text=token, values=(price, threshold_price, "")) def update_manual_sell(token): watchlist_treeview.set(token, "Manual Sell", "SELL") def init(): global w3, uniswap_router_v2, uniswap_router_v3 # Initialize Web3 infura_project_id = infura_project_id_entry.get() w3 = Web3(HTTPProvider(f'https://mainnet.infura.io/v3/{infura_project_id}')) w3.middleware_onion.inject(geth_poa_middleware, layer=0) # Initialize Uniswap Routers uniswap_router_v2_address = '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D' # Uniswap V2 Router address uniswap_router_v3_address = '0xE592427A0AEce92De3Edee1F18E0157C05861564' # Uniswap V3 Router address uniswap_router_v2_abi = [{"inputs":[{"internalType":"address","name":"_factory","type":"address"},{"internalType":"address","name":"_WETH","type":"address"}],"stateMutability":"nonpayable","type":"constructor"},{"inputs":[],"name":"WETH","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"}] uniswap_router_v3_abi = [{"inputs":[{"internalType":"address","name":"_factory","type":"address"},{"internalType":"address","name":"_WETH9","type":"address"}],"stateMutability":"nonpayable","type":"constructor"},{"inputs":[],"name":"WETH9","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"}] uniswap_router_v2 = w3.eth.contract(address=uniswap_router_v2_address, abi=uniswap_router_v2_abi) uniswap_router_v3 = w3.eth.contract(address=uniswap_router_v3_address, abi=uniswap_router_v3_abi) # Start the UI window.mainloop() # Start tracking tokens def start_tracking(): ethplorer_api_key = ethplorer_api_key_entry.get() etherscan_api_key = etherscan_api_key_entry.get() infura_project_id = infura_project_id_entry.get() gmail_user = gmail_user_entry.get() gmail_password = gmail_password_entry.get() metamask_account = metamask_account_entry.get() private_key = private_key_entry.get() threshold_percentage = float(threshold_entry.get()) # Validate API keys and private key if not (ethplorer_api_key and etherscan_api_key and infura_project_id and gmail_user and gmail_password and metamask_account and private_key): update_log("Please fill in all the required fields.") return # Create database connection with sqlite3.connect('tokens.db') as conn: c = conn.cursor() c.execute("""CREATE TABLE IF NOT EXISTS tokens ( token_address TEXT, purchase_price REAL, purchase_transaction_hash TEXT )""") def get_abi(token_address): standard_erc20_abi = [ { "constant": True, "inputs": [{"name": "_owner", "type": "address"}], "name": "balanceOf", "outputs": [{"name": "balance", "type": "uint256"}], "type": "function" }, { "constant": True, "inputs": [ {"name": "_owner", "type": "address"}, {"name": "_spender", "type": "address"} ], "name": "allowance", "outputs": [{"name": "", "type": "uint256"}], "type": "function" }, { "constant": False, "inputs": [ {"name": "_spender", "type": "address"}, {"name": "_value", "type": "uint256"} ], "name": "approve", "outputs": [{"name": "", "type": "bool"}], "type": "function" } ] try: # Try to fetch the specific ABI from Etherscan response = requests.get(f"https://api.etherscan.io/api?module=contract&action=getabi&address={token_address}") response.raise_for_status() data = response.json() return json.loads(data['result']) except: # If that fails, fall back to the standard ERC20 ABI return standard_erc20_abi def init_web3(): infura_project_id = infura_project_id_entry.get() provider_url = f"https://mainnet.infura.io/v3/{infura_project_id}" w3 = Web3(HTTPProvider(provider_url)) w3.middleware_onion.inject(geth_poa_middleware, layer=0) return w3 w3 = init_web3() def get_token_info(token): try: c.execute("SELECT token_address, purchase_price FROM tokens WHERE token_address = ?", (token,)) result = c.fetchone() if result: return result[0], result[1] else: return None, None except sqlite3.Error as e: logging.error(f'Error retrieving token information from database: {str(e)}') return None, None def is_token_in_database(token): token_address, _ = get_token_info(token) return token_address is not None def get_purchase_price_from_database(token): _, purchase_price = get_token_info(token) return purchase_price def get_current_prices(tokens): try: token_list = ','.join(tokens) response = requests.get(f'https://api.ethplorer.io/getTokenInfo/{token_list}?apiKey={ethplorer_api_key}') response.raise_for_status() data = response.json() prices = {} for token in data: token_address = token['address'] current_price = float(token['price']['rate']) prices[token_address] = current_price return prices except requests.exceptions.RequestException as e: logging.error(f'Error retrieving current prices: {str(e)}') return {} def calculate_threshold_price(purchase_price, threshold_percentage): return purchase_price * (1 + threshold_percentage / 100) def track_wallet(): global stop_threads token_cache = {} while not stop_threads: try: current_tokens = get_current_tokens(metamask_account) for token in current_tokens: if not is_token_in_database(token): if token not in token_cache: token_cache[token] = get_current_price(token) current_price = token_cache[token] if current_price is not None: purchase_price = get_purchase_price_from_database(token) threshold_price = calculate_threshold_price(purchase_price, threshold_percentage) if current_price >= threshold_price and get_gas_price() <= AVERAGE_GAS_PRICE: gas_spent, profit = sell_token(token) if gas_spent is not None and profit is not None: remove_from_database(token) send_email(token, profit, gas_spent) stop_price_checking_thread(token) # Stop tracking thread after selling and converting to ETH time.sleep(60) except Exception as e: logging.error(f'An error occurred while tracking the wallet: {str(e)}') def get_current_tokens(account): try: response = requests.get(f'https://api.ethplorer.io/getAddressInfo/{account}?apiKey={ethplorer_api_key}') response.raise_for_status() data = response.json() tokens = data['tokens'] return [token['tokenInfo']['address'] for token in tokens] except requests.exceptions.RequestException as e: logging.error(f'Error retrieving current tokens: {str(e)}') return [] def get_current_price(token): try: response = requests.get(f'https://api.ethplorer.io/getTokenInfo/{token}?apiKey={ethplorer_api_key}') response.raise_for_status() data = response.json() return float(data['price']['rate']) except requests.exceptions.RequestException as e: logging.error(f'Error retrieving current price for {token}: {str(e)}') return None def get_gas_price(): try: response = requests.get(f'https://api.ethplorer.io/getGasPrice?apiKey={ethplorer_api_key}') response.raise_for_status() data = response.json() return float(data['safeLow']) / 10**9 # Convert from Wei to Gwei except requests.exceptions.RequestException as e: logging.error(f'Error retrieving gas price: {str(e)}') return None def get_uniswap_version_with_more_liquidity(token): try: uniswap_v2 = Uniswap(private_key, w3, version=2) uniswap_v3 = Uniswap(private_key, w3, version=3) token_address = Web3.toChecksumAddress(token) token_contract = w3.eth.contract(address=token_address, abi=get_abi(token_address)) liquidity_v2 = uniswap_v2.get_liquidity(token_contract) liquidity_v3 = uniswap_v3.get_liquidity(token_contract) if liquidity_v2 > liquidity_v3: return 2 elif liquidity_v3 > liquidity_v2: return 3 else: return None except Exception as e: logging.error(f'An error occurred while checking Uniswap version with more liquidity for {token}: {str(e)}') return None def sell_token(token): try: uniswap_version = get_uniswap_version_with_more_liquidity(token) if uniswap_version == 2: uniswap_v2 = Uniswap(private_key, w3, version=2) uniswap_router_address = '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D' # Uniswap V2 Router address uniswap = uniswap_v2 elif uniswap_version == 3: uniswap_v3 = Uniswap(private_key, w3, version=3) uniswap_router_address = '0xE592427A0AEce92De3Edee1F18E0157C05861564' # Uniswap V3 Router address uniswap = uniswap_v3 else: logging.info(f'No Uniswap version with more liquidity found for {token}.') return None, None token_address = Web3.toChecksumAddress(token) token_contract = w3.eth.contract(address=token_address, abi=get_abi(token_address)) allowance = token_contract.functions.allowance(metamask_account, uniswap_router_address).call() if allowance == 0: gas_price = w3.eth.gas_price nonce = w3.eth.getTransactionCount(metamask_account) txn_dict = token_contract.functions.approve(uniswap_router_address, Web3.toWei(1e18, 'ether')).buildTransaction({ 'chainId': 1, 'gas': 50000, 'gasPrice': gas_price, 'nonce': nonce, }) signed_txn = w3.eth.account.signTransaction(txn_dict, private_key=private_key) txn_hash = w3.eth.sendRawTransaction(signed_txn.rawTransaction) txn_receipt = w3.eth.waitForTransactionReceipt(txn_hash) logging.info("Approved Uniswap to spend tokens\n" + str(txn_receipt)) balance = token_contract.functions.balanceOf(metamask_account).call() if balance == 0: logging.info(f"No {token} tokens to sell.") return None, None path = [w3.toChecksumAddress(token), w3.toChecksumAddress(w3.eth.default_account)] liquidity = uniswap.get_amounts_out(1, path) if liquidity[1] == 0: logging.info(f"No liquidity for {token} in Uniswap pool.") return None, None # Call ethgasstation API for dynamic gas pricing response = requests.get("https://ethgasstation.info/api/ethgasAPI.json") data = response.json() gas_price = int(data['average']) * 10**8 # Convert gwei to wei nonce = w3.eth.getTransactionCount(metamask_account) txn_dict = uniswap_router.functions.swapExactTokensForETHSupportingFeeOnTransferTokens( balance, # amountIn 0, # amountOutMin, set to 0 to accept any amount [token_address, w3.toChecksumAddress(w3.eth.default_account)], # path metamask_account, # recipient int(time.time()) + 10 * 60 # deadline ).buildTransaction({ 'chainId': 1, 'gas': 500000, 'gasPrice': gas_price, 'nonce': nonce, }) signed_txn = w3.eth.account.signTransaction(txn_dict, private_key=private_key) txn_hash = w3.eth.sendRawTransaction(signed_txn.rawTransaction) txn_receipt = w3.eth.waitForTransactionReceipt(txn_hash) logging.info(f"Sold {balance} {token} tokens\n" + str(txn_receipt)) gas_spent = txn_receipt['gasUsed'] * w3.fromWei(txn_dict['gasPrice'], 'gwei') profit = w3.fromWei(txn_receipt['cumulativeGasUsed'], 'ether') - get_purchase_price_from_database(token) return gas_spent, profit except Exception as e: logging.error(f'An error occurred while selling token {token}: {str(e)}') return None, None def remove_from_database(token): try: c.execute("DELETE FROM tokens WHERE token_address = ?", (token,)) conn.commit() logging.info(f'Token {token} removed from the database.') except sqlite3.Error as e: logging.error(f'Error removing token from database: {str(e)}') def send_email(token, profit, gas_spent): try: msg = MIMEMultipart() msg['From'] = gmail_user msg['To'] = gmail_user msg['Subject'] = 'Token Sold' body = f'Token {token} has been sold. Profit: {profit} ETH. Gas spent: {gas_spent} Gwei.' msg.attach(MIMEText(body, 'plain')) server = smtplib.SMTP('smtp.gmail.com', 587) server.starttls() server.login(gmail_user, gmail_password) text = msg.as_string() server.sendmail(gmail_user, gmail_user, text) server.quit() logging.info('Email notification sent.') except smtplib.SMTPException as e: logging.error(f'Error sending email notification: {str(e)}') def show_wallet_balance(): try: response = requests.get(f'https://api.ethplorer.io/getAddressInfo/{metamask_account}?apiKey={ethplorer_api_key}') response.raise_for_status() data = response.json() balance = float(data['ETH']['balance']) / 10**18 return balance except requests.exceptions.RequestException as e: logging.error(f'Error retrieving wallet balance: {str(e)}') return None def clear_log(): log_text.configure(state='normal') log_text.delete("1.0", tk.END) log_text.configure(state='disabled') def stop_tracking(): global stop_threads stop_threads = True logging.info('Token tracking stopped.') def stop_price_checking_thread(token): global threads if token in threads: threads[token] = False logging.info(f'Price checking stopped for token {token}.') def manual_sell(token): response = tk.messagebox.askyesno("Manual Sell", f"Do you want to manually sell {token}?") if response == tk.YES: gas_spent, profit = sell_token(token) if gas_spent is not None and profit is not None: remove_from_database(token) send_email(token, profit, gas_spent) stop_price_checking_thread(token) # Stop tracking thread after selling and converting to ETH update_log(f"Token {token} manually sold. Profit: {profit} ETH. Gas spent: {gas_spent} Gwei.") def add_token_to_watchlist(): token = token_entry.get().strip() if token: update_watchlist(token, "", "") token_entry.delete(0, tk.END) def remove_token_from_watchlist(): selected_item = watchlist_treeview.selection() if selected_item: token = watchlist_treeview.item(selected_item)['text'] watchlist_treeview.delete(selected_item) stop_price_checking_thread(token) logging.info(f'Token {token} removed from watchlist.') def update_threshold(): new_threshold_percentage = float(threshold_entry.get()) for child in watchlist_treeview.get_children(): current_threshold = watchlist_treeview.item(child)['values'][1] if current_threshold != "": current_threshold_price = float(current_threshold) new_threshold_price = current_threshold_price * (1 + new_threshold_percentage / 100) watchlist_treeview.set(child, "Threshold", f"{new_threshold_price:.6f}") def update_manual_sell_button(): selected_item = watchlist_treeview.selection() if selected_item: token = watchlist_treeview.item(selected_item)['text'] watchlist_treeview.set(selected_item, "Manual Sell", "SELL") update_manual_sell(token) def create_menu(event): selected_item = watchlist_treeview.selection() if selected_item: token = watchlist_treeview.item(selected_item)['text'] menu.post(event.x_root, event.y_root) def copy_address(): selected_item = watchlist_treeview.selection() if selected_item: token = watchlist_treeview.item(selected_item)['text'] window.clipboard_clear() window.clipboard_append(token) logging.info(f'Token {token} address copied to clipboard.') def copy_price(): selected_item = watchlist_treeview.selection() if selected_item: price = watchlist_treeview.item(selected_item)['values'][0] window.clipboard_clear() window.clipboard_append(price) logging.info(f'Price {price} copied to clipboard.') def copy_threshold(): selected_item = watchlist_treeview.selection() if selected_item: threshold = watchlist_treeview.item(selected_item)['values'][1] window.clipboard_clear() window.clipboard_append(threshold) logging.info(f'Threshold {threshold} copied to clipboard.') def copy_manual_sell(): selected_item = watchlist_treeview.selection() if selected_item: manual_sell = watchlist_treeview.item(selected_item)['values'][2] window.clipboard_clear() window.clipboard_append(manual_sell) logging.info(f'Manual Sell {manual_sell} copied to clipboard.') # Add token entry token_entry = tk.Entry(window, width=40) token_entry.pack(pady=10) # Add token button add_token_button = tk.Button(window, text="Add Token", command=add_token_to_watchlist) add_token_button.pack(pady=5) # Remove token button remove_token_button = tk.Button(window, text="Remove Token", command=remove_token_from_watchlist) remove_token_button.pack(pady=5) # Threshold update button threshold_update_button = tk.Button(window, text="Update Threshold", command=update_threshold) threshold_update_button.pack(pady=5) # Manual sell button manual_sell_button = tk.Button(window, text="Manual Sell", command=update_manual_sell_button) manual_sell_button.pack(pady=5) # Watchlist right-click menu menu = tk.Menu(window, tearoff=0) menu.add_command(label="Copy Token Address", command=copy_address) menu.add_command(label="Copy Price", command=copy_price) menu.add_command(label="Copy Threshold", command=copy_threshold) menu.add_command(label="Copy Manual Sell", command=copy_manual_sell) # Watchlist Treeview bindings watchlist_treeview.bind("<Button-3>", create_menu) # Start tracking button start_button = tk.Button(window, text="Start Tracking", command=start_tracking) start_button.pack(pady=10) # Stop tracking button stop_button = tk.Button(window, text="Stop Tracking", command=stop_tracking) stop_button.pack(pady=5) # Wallet balance button balance_button = tk.Button(window, text="Wallet Balance", command=show_wallet_balance) balance_button.pack(pady=5) # Clear log button clear_button = tk.Button(window, text="Clear Log", command=clear_log) clear_button.pack(pady=5) window.mainloop() # Clean up database connection conn.close() if __name__ == "__main__": init()
Editor is loading...