Untitled
unknown
plain_text
3 years ago
26 kB
12
Indexable
import tkinter as tk
import threading
import logging
import time
from PIL import ImageTk, Image
import sqlite3
import requests
from web3 import Web3, HTTPProvider
from web3.middleware import geth_poa_middleware
from decimal import Decimal
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from uniswap import Uniswap
import json
from tkinter import ttk
from concurrent.futures import ThreadPoolExecutor
# Set up the window
window = tk.Tk()
window.title("Token Monitor")
window.geometry("800x600")
# Load and display logo image
logo_img = ImageTk.PhotoImage(Image.open("logo.png"))
logo_label = tk.Label(window, image=logo_img)
logo_label.pack(pady=20)
# Input fields
ethplorer_api_key_entry = tk.Entry(window, width=40)
ethplorer_api_key_entry.insert(0, "Enter your Ethplorer API key")
ethplorer_api_key_entry.pack(pady=10)
etherscan_api_key_entry = tk.Entry(window, width=40)
etherscan_api_key_entry.insert(0, "Enter your Etherscan API key")
etherscan_api_key_entry.pack(pady=10)
infura_project_id_entry = tk.Entry(window, width=40)
infura_project_id_entry.insert(0, "Enter your Infura project ID")
infura_project_id_entry.pack(pady=10)
gmail_user_entry = tk.Entry(window, width=40)
gmail_user_entry.insert(0, "Enter your Gmail address")
gmail_user_entry.pack(pady=10)
gmail_password_entry = tk.Entry(window, width=40, show="*")
gmail_password_entry.insert(0, "Enter your Gmail password")
gmail_password_entry.pack(pady=10)
metamask_account_entry = tk.Entry(window, width=40)
metamask_account_entry.insert(0, "Enter your MetaMask account address")
metamask_account_entry.pack(pady=10)
private_key_entry = tk.Entry(window, width=40, show="*")
private_key_entry.insert(0, "Enter your MetaMask private key")
private_key_entry.pack(pady=10)
threshold_entry = tk.Entry(window, width=40)
threshold_entry.insert(0, "Enter threshold percentage")
threshold_entry.pack(pady=10)
# Log display
log_text = tk.Text(window, height=10, width=80)
log_text.pack(pady=20)
# Watchlist Treeview
watchlist_frame = tk.Frame(window)
watchlist_frame.pack(pady=10)
watchlist_treeview = ttk.Treeview(watchlist_frame, columns=("Price", "Threshold", "Manual Sell"))
watchlist_treeview.heading("#0", text="Token")
watchlist_treeview.heading("Price", text="Price")
watchlist_treeview.heading("Threshold", text="Threshold")
watchlist_treeview.heading("Manual Sell", text="Manual Sell")
watchlist_treeview.pack(side="left")
treeview_scrollbar = ttk.Scrollbar(watchlist_frame, orient="vertical", command=watchlist_treeview.yview)
treeview_scrollbar.pack(side="right", fill="y")
watchlist_treeview.configure(yscrollcommand=treeview_scrollbar.set)
# Configure logging
log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
log_handler = logging.FileHandler('token_monitor.log')
log_handler.setFormatter(log_formatter)
log_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(log_handler)
def update_log(log):
log_text.configure(state='normal')
log_text.insert(tk.END, log + "\n")
log_text.configure(state='disabled')
log_text.see(tk.END)
logging.info(log)
def update_watchlist(token, price, threshold):
threshold_price = calculate_threshold_price(float(price), threshold_percentage)
watchlist_treeview.insert("", tk.END, text=token, values=(price, threshold_price, ""))
def update_manual_sell(token):
watchlist_treeview.set(token, "Manual Sell", "SELL")
def init():
global w3, uniswap_router_v2, uniswap_router_v3
# Initialize Web3
infura_project_id = infura_project_id_entry.get()
w3 = Web3(HTTPProvider(f'https://mainnet.infura.io/v3/{infura_project_id}'))
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
# Initialize Uniswap Routers
uniswap_router_v2_address = '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D' # Uniswap V2 Router address
uniswap_router_v3_address = '0xE592427A0AEce92De3Edee1F18E0157C05861564' # Uniswap V3 Router address
uniswap_router_v2_abi = [{"inputs":[{"internalType":"address","name":"_factory","type":"address"},{"internalType":"address","name":"_WETH","type":"address"}],"stateMutability":"nonpayable","type":"constructor"},{"inputs":[],"name":"WETH","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"}]
uniswap_router_v3_abi = [{"inputs":[{"internalType":"address","name":"_factory","type":"address"},{"internalType":"address","name":"_WETH9","type":"address"}],"stateMutability":"nonpayable","type":"constructor"},{"inputs":[],"name":"WETH9","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"}]
uniswap_router_v2 = w3.eth.contract(address=uniswap_router_v2_address, abi=uniswap_router_v2_abi)
uniswap_router_v3 = w3.eth.contract(address=uniswap_router_v3_address, abi=uniswap_router_v3_abi)
# Start the UI
window.mainloop()
# Start tracking tokens
def start_tracking():
ethplorer_api_key = ethplorer_api_key_entry.get()
etherscan_api_key = etherscan_api_key_entry.get()
infura_project_id = infura_project_id_entry.get()
gmail_user = gmail_user_entry.get()
gmail_password = gmail_password_entry.get()
metamask_account = metamask_account_entry.get()
private_key = private_key_entry.get()
threshold_percentage = float(threshold_entry.get())
# Validate API keys and private key
if not (ethplorer_api_key and etherscan_api_key and infura_project_id and gmail_user and gmail_password and metamask_account and private_key):
update_log("Please fill in all the required fields.")
return
# Create database connection
with sqlite3.connect('tokens.db') as conn:
c = conn.cursor()
c.execute("""CREATE TABLE IF NOT EXISTS tokens (
token_address TEXT,
purchase_price REAL,
purchase_transaction_hash TEXT
)""")
def get_abi(token_address):
standard_erc20_abi = [
{
"constant": True,
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "balance", "type": "uint256"}],
"type": "function"
},
{
"constant": True,
"inputs": [
{"name": "_owner", "type": "address"},
{"name": "_spender", "type": "address"}
],
"name": "allowance",
"outputs": [{"name": "", "type": "uint256"}],
"type": "function"
},
{
"constant": False,
"inputs": [
{"name": "_spender", "type": "address"},
{"name": "_value", "type": "uint256"}
],
"name": "approve",
"outputs": [{"name": "", "type": "bool"}],
"type": "function"
}
]
try:
# Try to fetch the specific ABI from Etherscan
response = requests.get(f"https://api.etherscan.io/api?module=contract&action=getabi&address={token_address}")
response.raise_for_status()
data = response.json()
return json.loads(data['result'])
except:
# If that fails, fall back to the standard ERC20 ABI
return standard_erc20_abi
def init_web3():
infura_project_id = infura_project_id_entry.get()
provider_url = f"https://mainnet.infura.io/v3/{infura_project_id}"
w3 = Web3(HTTPProvider(provider_url))
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
return w3
w3 = init_web3()
def get_token_info(token):
try:
c.execute("SELECT token_address, purchase_price FROM tokens WHERE token_address = ?", (token,))
result = c.fetchone()
if result:
return result[0], result[1]
else:
return None, None
except sqlite3.Error as e:
logging.error(f'Error retrieving token information from database: {str(e)}')
return None, None
def is_token_in_database(token):
token_address, _ = get_token_info(token)
return token_address is not None
def get_purchase_price_from_database(token):
_, purchase_price = get_token_info(token)
return purchase_price
def get_current_prices(tokens):
try:
token_list = ','.join(tokens)
response = requests.get(f'https://api.ethplorer.io/getTokenInfo/{token_list}?apiKey={ethplorer_api_key}')
response.raise_for_status()
data = response.json()
prices = {}
for token in data:
token_address = token['address']
current_price = float(token['price']['rate'])
prices[token_address] = current_price
return prices
except requests.exceptions.RequestException as e:
logging.error(f'Error retrieving current prices: {str(e)}')
return {}
def calculate_threshold_price(purchase_price, threshold_percentage):
return purchase_price * (1 + threshold_percentage / 100)
def track_wallet():
global stop_threads
token_cache = {}
while not stop_threads:
try:
current_tokens = get_current_tokens(metamask_account)
for token in current_tokens:
if not is_token_in_database(token):
if token not in token_cache:
token_cache[token] = get_current_price(token)
current_price = token_cache[token]
if current_price is not None:
purchase_price = get_purchase_price_from_database(token)
threshold_price = calculate_threshold_price(purchase_price, threshold_percentage)
if current_price >= threshold_price and get_gas_price() <= AVERAGE_GAS_PRICE:
gas_spent, profit = sell_token(token)
if gas_spent is not None and profit is not None:
remove_from_database(token)
send_email(token, profit, gas_spent)
stop_price_checking_thread(token) # Stop tracking thread after selling and converting to ETH
time.sleep(60)
except Exception as e:
logging.error(f'An error occurred while tracking the wallet: {str(e)}')
def get_current_tokens(account):
try:
response = requests.get(f'https://api.ethplorer.io/getAddressInfo/{account}?apiKey={ethplorer_api_key}')
response.raise_for_status()
data = response.json()
tokens = data['tokens']
return [token['tokenInfo']['address'] for token in tokens]
except requests.exceptions.RequestException as e:
logging.error(f'Error retrieving current tokens: {str(e)}')
return []
def get_current_price(token):
try:
response = requests.get(f'https://api.ethplorer.io/getTokenInfo/{token}?apiKey={ethplorer_api_key}')
response.raise_for_status()
data = response.json()
return float(data['price']['rate'])
except requests.exceptions.RequestException as e:
logging.error(f'Error retrieving current price for {token}: {str(e)}')
return None
def get_gas_price():
try:
response = requests.get(f'https://api.ethplorer.io/getGasPrice?apiKey={ethplorer_api_key}')
response.raise_for_status()
data = response.json()
return float(data['safeLow']) / 10**9 # Convert from Wei to Gwei
except requests.exceptions.RequestException as e:
logging.error(f'Error retrieving gas price: {str(e)}')
return None
def get_uniswap_version_with_more_liquidity(token):
try:
uniswap_v2 = Uniswap(private_key, w3, version=2)
uniswap_v3 = Uniswap(private_key, w3, version=3)
token_address = Web3.toChecksumAddress(token)
token_contract = w3.eth.contract(address=token_address, abi=get_abi(token_address))
liquidity_v2 = uniswap_v2.get_liquidity(token_contract)
liquidity_v3 = uniswap_v3.get_liquidity(token_contract)
if liquidity_v2 > liquidity_v3:
return 2
elif liquidity_v3 > liquidity_v2:
return 3
else:
return None
except Exception as e:
logging.error(f'An error occurred while checking Uniswap version with more liquidity for {token}: {str(e)}')
return None
def sell_token(token):
try:
uniswap_version = get_uniswap_version_with_more_liquidity(token)
if uniswap_version == 2:
uniswap_v2 = Uniswap(private_key, w3, version=2)
uniswap_router_address = '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D' # Uniswap V2 Router address
uniswap = uniswap_v2
elif uniswap_version == 3:
uniswap_v3 = Uniswap(private_key, w3, version=3)
uniswap_router_address = '0xE592427A0AEce92De3Edee1F18E0157C05861564' # Uniswap V3 Router address
uniswap = uniswap_v3
else:
logging.info(f'No Uniswap version with more liquidity found for {token}.')
return None, None
token_address = Web3.toChecksumAddress(token)
token_contract = w3.eth.contract(address=token_address, abi=get_abi(token_address))
allowance = token_contract.functions.allowance(metamask_account, uniswap_router_address).call()
if allowance == 0:
gas_price = w3.eth.gas_price
nonce = w3.eth.getTransactionCount(metamask_account)
txn_dict = token_contract.functions.approve(uniswap_router_address, Web3.toWei(1e18, 'ether')).buildTransaction({
'chainId': 1,
'gas': 50000,
'gasPrice': gas_price,
'nonce': nonce,
})
signed_txn = w3.eth.account.signTransaction(txn_dict, private_key=private_key)
txn_hash = w3.eth.sendRawTransaction(signed_txn.rawTransaction)
txn_receipt = w3.eth.waitForTransactionReceipt(txn_hash)
logging.info("Approved Uniswap to spend tokens\n" + str(txn_receipt))
balance = token_contract.functions.balanceOf(metamask_account).call()
if balance == 0:
logging.info(f"No {token} tokens to sell.")
return None, None
path = [w3.toChecksumAddress(token), w3.toChecksumAddress(w3.eth.default_account)]
liquidity = uniswap.get_amounts_out(1, path)
if liquidity[1] == 0:
logging.info(f"No liquidity for {token} in Uniswap pool.")
return None, None
gas_price = w3.eth.gas_price
nonce = w3.eth.getTransactionCount(metamask_account)
txn_dict = uniswap_router.functions.swapExactTokensForETHSupportingFeeOnTransferTokens(
balance, # amountIn
0, # amountOutMin, set to 0 to accept any amount
[token_address, w3.toChecksumAddress(w3.eth.default_account)], # path
metamask_account, # recipient
int(time.time()) + 10 * 60 # deadline
).buildTransaction({
'chainId': 1,
'gas': 500000,
'gasPrice': gas_price,
'nonce': nonce,
})
signed_txn = w3.eth.account.signTransaction(txn_dict, private_key=private_key)
txn_hash = w3.eth.sendRawTransaction(signed_txn.rawTransaction)
txn_receipt = w3.eth.waitForTransactionReceipt(txn_hash)
logging.info(f"Sold {balance} {token} tokens\n" + str(txn_receipt))
gas_spent = txn_receipt['gasUsed'] * w3.fromWei(txn_dict['gasPrice'], 'gwei')
profit = w3.fromWei(txn_receipt['cumulativeGasUsed'], 'ether') - get_purchase_price_from_database(token)
return gas_spent, profit
except Exception as e:
logging.error(f'An error occurred while selling token {token}: {str(e)}')
return None, None
def remove_from_database(token):
try:
c.execute("DELETE FROM tokens WHERE token_address = ?", (token,))
conn.commit()
logging.info(f'Token {token} removed from the database.')
except sqlite3.Error as e:
logging.error(f'Error removing token from database: {str(e)}')
def send_email(token, profit, gas_spent):
try:
msg = MIMEMultipart()
msg['From'] = gmail_user
msg['To'] = gmail_user
msg['Subject'] = 'Token Sold'
body = f'Token {token} has been sold. Profit: {profit} ETH. Gas spent: {gas_spent} Gwei.'
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(gmail_user, gmail_password)
text = msg.as_string()
server.sendmail(gmail_user, gmail_user, text)
server.quit()
logging.info('Email notification sent.')
except smtplib.SMTPException as e:
logging.error(f'Error sending email notification: {str(e)}')
def show_wallet_balance():
try:
response = requests.get(f'https://api.ethplorer.io/getAddressInfo/{metamask_account}?apiKey={ethplorer_api_key}')
response.raise_for_status()
data = response.json()
balance = float(data['ETH']['balance']) / 10**18
return balance
except requests.exceptions.RequestException as e:
logging.error(f'Error retrieving wallet balance: {str(e)}')
return None
def clear_log():
log_text.configure(state='normal')
log_text.delete("1.0", tk.END)
log_text.configure(state='disabled')
def stop_tracking():
global stop_threads
stop_threads = True
logging.info('Token tracking stopped.')
def stop_price_checking_thread(token):
global threads
if token in threads:
threads[token] = False
logging.info(f'Price checking stopped for token {token}.')
def manual_sell(token):
response = tk.messagebox.askyesno("Manual Sell", f"Do you want to manually sell {token}?")
if response == tk.YES:
gas_spent, profit = sell_token(token)
if gas_spent is not None and profit is not None:
remove_from_database(token)
send_email(token, profit, gas_spent)
stop_price_checking_thread(token) # Stop tracking thread after selling and converting to ETH
update_log(f"Token {token} manually sold. Profit: {profit} ETH. Gas spent: {gas_spent} Gwei.")
def add_token_to_watchlist():
token = token_entry.get().strip()
if token:
update_watchlist(token, "", "")
token_entry.delete(0, tk.END)
def remove_token_from_watchlist():
selected_item = watchlist_treeview.selection()
if selected_item:
token = watchlist_treeview.item(selected_item)['text']
watchlist_treeview.delete(selected_item)
stop_price_checking_thread(token)
logging.info(f'Token {token} removed from watchlist.')
def update_threshold():
new_threshold_percentage = float(threshold_entry.get())
for child in watchlist_treeview.get_children():
current_threshold = watchlist_treeview.item(child)['values'][1]
if current_threshold != "":
current_threshold_price = float(current_threshold)
new_threshold_price = current_threshold_price * (1 + new_threshold_percentage / 100)
watchlist_treeview.set(child, "Threshold", f"{new_threshold_price:.6f}")
def update_manual_sell_button():
selected_item = watchlist_treeview.selection()
if selected_item:
token = watchlist_treeview.item(selected_item)['text']
watchlist_treeview.set(selected_item, "Manual Sell", "SELL")
update_manual_sell(token)
def create_menu(event):
selected_item = watchlist_treeview.selection()
if selected_item:
token = watchlist_treeview.item(selected_item)['text']
menu.post(event.x_root, event.y_root)
def copy_address():
selected_item = watchlist_treeview.selection()
if selected_item:
token = watchlist_treeview.item(selected_item)['text']
window.clipboard_clear()
window.clipboard_append(token)
logging.info(f'Token {token} address copied to clipboard.')
def copy_price():
selected_item = watchlist_treeview.selection()
if selected_item:
price = watchlist_treeview.item(selected_item)['values'][0]
window.clipboard_clear()
window.clipboard_append(price)
logging.info(f'Price {price} copied to clipboard.')
def copy_threshold():
selected_item = watchlist_treeview.selection()
if selected_item:
threshold = watchlist_treeview.item(selected_item)['values'][1]
window.clipboard_clear()
window.clipboard_append(threshold)
logging.info(f'Threshold {threshold} copied to clipboard.')
def copy_manual_sell():
selected_item = watchlist_treeview.selection()
if selected_item:
manual_sell = watchlist_treeview.item(selected_item)['values'][2]
window.clipboard_clear()
window.clipboard_append(manual_sell)
logging.info(f'Manual Sell {manual_sell} copied to clipboard.')
# Add token entry
token_entry = tk.Entry(window, width=40)
token_entry.pack(pady=10)
# Add token button
add_token_button = tk.Button(window, text="Add Token", command=add_token_to_watchlist)
add_token_button.pack(pady=5)
# Remove token button
remove_token_button = tk.Button(window, text="Remove Token", command=remove_token_from_watchlist)
remove_token_button.pack(pady=5)
# Threshold update button
threshold_update_button = tk.Button(window, text="Update Threshold", command=update_threshold)
threshold_update_button.pack(pady=5)
# Manual sell button
manual_sell_button = tk.Button(window, text="Manual Sell", command=update_manual_sell_button)
manual_sell_button.pack(pady=5)
# Watchlist right-click menu
menu = tk.Menu(window, tearoff=0)
menu.add_command(label="Copy Token Address", command=copy_address)
menu.add_command(label="Copy Price", command=copy_price)
menu.add_command(label="Copy Threshold", command=copy_threshold)
menu.add_command(label="Copy Manual Sell", command=copy_manual_sell)
# Watchlist Treeview bindings
watchlist_treeview.bind("<Button-3>", create_menu)
# Start tracking button
start_button = tk.Button(window, text="Start Tracking", command=start_tracking)
start_button.pack(pady=10)
# Stop tracking button
stop_button = tk.Button(window, text="Stop Tracking", command=stop_tracking)
stop_button.pack(pady=5)
# Wallet balance button
balance_button = tk.Button(window, text="Wallet Balance", command=show_wallet_balance)
balance_button.pack(pady=5)
# Clear log button
clear_button = tk.Button(window, text="Clear Log", command=clear_log)
clear_button.pack(pady=5)
window.mainloop()
# Clean up database connection
conn.close()
if __name__ == "__main__":
init()
Editor is loading...