Untitled
unknown
python
a year ago
3.1 kB
9
Indexable
import base64 import hmac import re import requests import time from hashlib import sha256 # ========================== CONFIGURATION ========================== # Telegram BOT_TOKEN = "6621450938:AAHnWkTkIV0E_YSzYpaWWrX6_icJkmGiZtU" CHAT_ID = "1001909732318" # OKX OKX_API_KEY = '' OKX_SECRET_KEY = '' OKX_PASSPHRASE = '' BASE_URL = "https://www.okex.com" # ========================== FUNCTIONS ========================== def fetch_last_telegram_message(bot_token, chat_id): url = f"https://api.telegram.org/bot{bot_token}/getUpdates" response = requests.get(url) messages = response.json()["result"] return messages[-1] def extract_data_from_message(message): data = {} pair_match = re.search(r"Pair: (\$[A-Z]+)", message) direction_match = re.search(r"Direction: (\w+)", message) entry_match = re.search(r"ENTRY : (.+)", message) stop_loss_match = re.search(r"🚫STOP LOSS: ([\d.,]+)", message) targets_matches = re.findall(r"🔘Target \d+ - ([\d.,]+)", message) if pair_match: data["Pair"] = pair_match.group(1) if direction_match: data["Direction"] = direction_match.group(1) if entry_match: data["Entry"] = [x.strip() for x in entry_match.group(1).split("-")] if stop_loss_match: data["Stop Loss"] = stop_loss_match.group(1) if targets_matches: data["Targets"] = targets_matches return data def create_okx_signature(endpoint, method, timestamp, secret_key): message = timestamp + method + endpoint mac = hmac.new(bytes(secret_key, 'latin-1'), msg=bytes(message, 'latin-1'), digestmod=sha256) return base64.b64encode(mac.digest()) def fetch_okx_server_time(): url = BASE_URL + "/api/v5/public/time" response = requests.get(url) if response.status_code != 200: raise Exception(f"Failed to fetch server time: {response.text}") return str(int(response.json()["data"][0]["ts"]) / 1000) def fetch_okx_balance(): endpoint = "/api/v5/account/balance" timestamp = fetch_okx_server_time() headers = { "OK-ACCESS-KEY": OKX_API_KEY, "OK-ACCESS-SIGN": create_okx_signature(endpoint, "GET", timestamp, OKX_SECRET_KEY), "OK-ACCESS-TIMESTAMP": timestamp, "OK-ACCESS-PASSPHRASE": OKX_PASSPHRASE, "Content-Type": "application/json", "x-simulated-trading": "1" } response = requests.get(BASE_URL + endpoint, headers=headers) if response.status_code != 200: print(f"ERROR: {response.status_code} - {response.text}") return None return response.json() # ========================== MAIN ========================== if __name__ == "__main__": last_msg = fetch_last_telegram_message(BOT_TOKEN, CHAT_ID) if last_msg: # print(last_msg) parsed_msg = extract_data_from_message(last_msg['channel_post']['caption']) # Uncomment to print the parsed message print(parsed_msg) else: print("No message found.") account_balance = fetch_okx_balance() available_balance = account_balance['data'][0]['details'][0]['availBal'] print(available_balance)
Editor is loading...