Untitled

 avatar
unknown
plain_text
2 days ago
14 kB
26
Indexable
import asyncio
import sys
import tkinter as tk
import win32api
import win32con
import win32gui
import logging
import time
import threading
import struct
import random

from pymem import Pymem
from pymem.exception import MemoryReadError
from tkinter import messagebox

# ------------------------------------------------------------
# Configuration Constants
# ------------------------------------------------------------

PROCESS_NAME = "PathOfExileSteam.exe"
BASE_POINTER_OFFSET = 0x040A6128

# Memory Offsets
OFFSETS_CUR_HP = [0x38, 0x0, 0x80, 0x290, 0x1D8]
OFFSETS_MAX_HP = [0x60, 0x68, 0x40, 0x20, 0x1F0, 0x2C]
OFFSETS_CUR_MP = [0x98, 0x70, 0x20, 0x50, 0x360]
OFFSETS_MAX_MP = [0x60, 0x38, 0x20, 0x48, 0x10, 0x1F0, 0x2C]

# Potion Keys
HP_KEY = "1"
MP_KEY = "2"

# Thresholds
HP_THRESHOLD = 1600
MP_THRESHOLD = 200

# Cooldowns
HP_FLASK_COOLDOWN = 2.0
MP_FLASK_COOLDOWN = 2.0

# Intervals
CHECK_INTERVAL = 0.25

# ------------------------------------------------------------
# Logger Setup
# ------------------------------------------------------------

logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(message)s')
logger = logging.getLogger()

# Global state
pm = None
base_pointer = None
game_window = None
cur_hp_addr = None
max_hp_addr = None
last_max_hp = None
cur_mp_addr = None
max_mp_addr = None
last_max_mp = None
last_hp_flask_time = 0
last_mp_flask_time = 0
stop_event = asyncio.Event()
root = None

# ------------------------------------------------------------
# Helper Functions
# ------------------------------------------------------------

def find_dma_address(pm, base, offsets, label=""):
    try:
        addr = pm.read_longlong(base)
        logger.debug(f"[{label}] Base: 0x{base:X} → 0x{addr:X}")

        for i, offset in enumerate(offsets[:-1]):
            if addr == 0:
                logger.warning(f"[{label}] Null pointer encountered at step {i}")
                return None
            prev = addr
            addr = pm.read_longlong(addr + offset)
            logger.debug(f"[{label}] Step {i}: 0x{prev + offset:X} → 0x{addr:X}")

        final_address = addr + offsets[-1]
        logger.debug(f"[{label}] Final address: 0x{final_address:X}")
        return final_address
    except MemoryReadError as e:
        logger.warning(f"[{label}] DMA address calculation failed: {e}")
        return None

def press_key(window, key):
    try:
        win32api.PostMessage(window, win32con.WM_KEYDOWN, ord(key), 0)
        win32api.PostMessage(window, win32con.WM_KEYUP, ord(key), 0)
    except Exception as e:
        logger.error(f"Error pressing key {key}: {e}")

def show_error_popup(message):
    global root
    if not root:
        root = tk.Tk()
        root.withdraw()
    messagebox.showerror("Memory Access Error", message)

def show_error_popup_async(message):
    threading.Thread(target=lambda: show_error_popup(message)).start()

def find_window_by_partial_title(partial_title):
    def enum_windows_callback(hwnd, hwnd_list):
        title = win32gui.GetWindowText(hwnd)
        if partial_title.lower() in title.lower():
            hwnd_list.append(hwnd)
    hwnd_list = []
    win32gui.EnumWindows(enum_windows_callback, hwnd_list)
    return hwnd_list[0] if hwnd_list else None

def is_game_running():
    try:
        hwnd = find_window_by_partial_title("Path of Exile")
        if not hwnd:
            logger.error("Game window not found. Please start the game and try again.")
            return False
        return True
    except Exception as e:
        logger.error(f"Error checking game window: {e}")
        return False

def is_game_active():
    try:
        current_hp = pm.read_int(cur_hp_addr)
        current_mp = pm.read_int(cur_mp_addr)
        return not (current_hp == 0 and current_mp == 0)
    except MemoryReadError as e:
        logger.warning(f"Memory read failed in is_game_active(): {e}")
        logger.info("Attempting to rehook pointers due to read failure...")
        rehook_pointers()
        return False

def dump_pointer_info():
    try:
        with open("pointer_debug.txt", "w") as f:
            f.write(f"Cur HP Addr: {hex(cur_hp_addr) if cur_hp_addr else 'None'}\n")
            f.write(f"Max HP Addr: {hex(max_hp_addr) if max_hp_addr else 'None'}\n")
            f.write(f"Cur MP Addr: {hex(cur_mp_addr) if cur_mp_addr else 'None'}\n")
            f.write(f"Max MP Addr: {hex(max_mp_addr) if max_mp_addr else 'None'}\n")
    except Exception as e:
        logger.warning(f"Failed to dump pointers: {e}")

def rehook_pointers():
    global cur_hp_addr, max_hp_addr, cur_mp_addr, max_mp_addr, last_max_hp, last_max_mp
    try:
        temp_cur_hp = find_dma_address(pm, base_pointer, OFFSETS_CUR_HP, label="CUR_HP")
        temp_max_hp = find_dma_address(pm, base_pointer, OFFSETS_MAX_HP, label="MAX_HP")
        temp_cur_mp = find_dma_address(pm, base_pointer, OFFSETS_CUR_MP, label="CUR_MP")
        temp_max_mp = find_dma_address(pm, base_pointer, OFFSETS_MAX_MP, label="MAX_MP")

        if None in (temp_cur_hp, temp_max_hp, temp_cur_mp, temp_max_mp):
            raise MemoryReadError("One or more DMA addresses are invalid (None).")

        cur_hp_val = pm.read_int(temp_cur_hp)
        max_hp_val = pm.read_int(temp_max_hp)
        cur_mp_val = pm.read_int(temp_cur_mp)
        max_mp_val = pm.read_int(temp_max_mp)

        if not (0 <= cur_hp_val <= max_hp_val):
            raise ValueError(f"Current HP value {cur_hp_val} out of bounds (0 to {max_hp_val})")
        if not (0 <= cur_mp_val <= max_mp_val):
            raise ValueError(f"Current MP value {cur_mp_val} out of bounds (0 to {max_mp_val})")

        cur_hp_addr = temp_cur_hp
        max_hp_addr = temp_max_hp
        cur_mp_addr = temp_cur_mp
        max_mp_addr = temp_max_mp

        last_max_hp = max_hp_val
        last_max_mp = max_mp_val

        logger.info(f"Rehook successful. HP ptr: 0x{cur_hp_addr:X}, MP ptr: 0x{cur_mp_addr:X}")
        logger.info(f"HP: {cur_hp_val}/{max_hp_val}, MP: {cur_mp_val}/{max_mp_val}")

    except Exception as e:
        logger.error(f"Failed to rehook memory: {e}")
        show_error_popup_async("Failed to rehook memory. You may need to restart the script or game.")
    
    dump_pointer_info()

def validate_pointers():
    global last_max_hp, last_max_mp
    try:
        cur_hp = pm.read_int(cur_hp_addr)
        max_hp = pm.read_int(max_hp_addr)
        cur_mp = pm.read_int(cur_mp_addr)
        max_mp = pm.read_int(max_mp_addr)

        if last_max_hp != max_hp:
            logger.info(f"Max HP changed from {last_max_hp} to {max_hp}. Rehooking.")
            last_max_hp = max_hp
            rehook_pointers()

        if last_max_mp != max_mp:
            logger.info(f"Max MP changed from {last_max_mp} to {max_mp}. Rehooking.")
            last_max_mp = max_mp
            rehook_pointers()

        if not (0 <= cur_hp <= max_hp):
            logger.warning(f"Corrupt HP: cur_hp={cur_hp}, max_hp={max_hp}, rehooking.")
            rehook_pointers()
            return False

        if not (0 <= cur_mp <= max_mp):
            logger.warning(f"Corrupt MP: cur_mp={cur_mp}, max_mp={max_mp}, rehooking.")
            rehook_pointers()
            return False

        return True

    except MemoryReadError as e:
        logger.warning(f"Validation read error: {e}. Rehooking.")
        rehook_pointers()
        return False

async def read_memory_with_retry(pm, addr, retries=3, delay=1):
    for attempt in range(retries):
        try:
            return pm.read_int(addr)
        except MemoryReadError as e:
            logger.warning(f"Memory read failed (Attempt {attempt + 1}/{retries}): {e}")
            if attempt == retries - 1:
                logger.info("Triggering pointer rehook due to repeated read failures...")
                rehook_pointers()
            await asyncio.sleep(delay)
    raise MemoryReadError(addr, struct.calcsize('i'), "Failed after retries")

def get_random_delay(base, variance=0.15):
    return base + random.uniform(-variance, variance)

# ------------------------------------------------------------
# Potion Routines
# ------------------------------------------------------------

async def confirm_hp_flask(current_hp, max_hp):
    readings = []
    for _ in range(6):
        hp_val = await read_memory_with_retry(pm, cur_hp_addr)
        readings.append(hp_val)
        await asyncio.sleep(0.3)
    new_hp = max(readings)

    if new_hp >= max_hp - 500:
        logger.info(f"HP flask confirmed: {current_hp} → {new_hp}")
    else:
        logger.warning(f"HP flask ineffective: {current_hp} → {new_hp}")

async def confirm_mp_flask(current_mp, max_mp):
    readings = []
    for _ in range(6):
        mp_val = await read_memory_with_retry(pm, cur_mp_addr)
        readings.append(mp_val)
        await asyncio.sleep(0.3)
    new_mp = max(readings)

    if new_mp >= max_mp - 100:
        logger.info(f"MP flask confirmed: {current_mp} → {new_mp}")
    else:
        logger.warning(f"MP flask ineffective: {current_mp} → {new_mp}")

async def hp_routine():
    global last_hp_flask_time, last_max_hp
    while not stop_event.is_set():
        try:
            if is_game_active():
                if not validate_pointers():
                    continue

                current_hp = await read_memory_with_retry(pm, cur_hp_addr)
                max_hp = await read_memory_with_retry(pm, max_hp_addr)
                now = time.time()

                if current_hp <= HP_THRESHOLD and (now - last_hp_flask_time) >= HP_FLASK_COOLDOWN:
                    logger.info(f"Triggering HP flask. HP: {current_hp}/{max_hp}")
                    press_key(game_window, HP_KEY)
                    await confirm_hp_flask(current_hp, max_hp)
                    last_hp_flask_time = now

                last_max_hp = max_hp
            else:
                logger.debug("Game inactive - skipping HP check")
        except Exception as e:
            logger.error(f"HP routine error: {e}", exc_info=True)
            show_error_popup("HP monitoring error. Please restart the game.")
            sys.exit(1)
        finally:
            await asyncio.sleep(get_random_delay(CHECK_INTERVAL))

async def mp_routine():
    global last_mp_flask_time, last_max_mp
    while not stop_event.is_set():
        try:
            if is_game_active():
                if not validate_pointers():
                    continue

                current_mp = await read_memory_with_retry(pm, cur_mp_addr)
                max_mp = await read_memory_with_retry(pm, max_mp_addr)
                now = time.time()

                if current_mp <= MP_THRESHOLD and (now - last_mp_flask_time) >= MP_FLASK_COOLDOWN:
                    logger.info(f"Triggering MP flask. MP: {current_mp}/{max_mp}")
                    press_key(game_window, MP_KEY)
                    await confirm_mp_flask(current_mp, max_mp)
                    last_mp_flask_time = now

                last_max_mp = max_mp
            else:
                logger.debug("Game inactive - skipping MP check")
        except Exception as e:
            logger.error(f"MP routine error: {e}", exc_info=True)
            show_error_popup("MP monitoring error. Please restart the game.")
            sys.exit(1)
        finally:
            await asyncio.sleep(get_random_delay(CHECK_INTERVAL))

# ------------------------------------------------------------
# Initialization and Entry
# ------------------------------------------------------------

def init():
    global pm, base_pointer, game_window, cur_hp_addr, max_hp_addr, cur_mp_addr, max_mp_addr

    if not is_game_running():
        sys.exit(1)

    try:
        pm = Pymem(PROCESS_NAME)
        base_pointer = pm.base_address + BASE_POINTER_OFFSET

        game_window = find_window_by_partial_title("Path of Exile")
        if not game_window:
            raise Exception("Game window not found.")

        rehook_pointers()

        if None in (cur_hp_addr, max_hp_addr, cur_mp_addr, max_mp_addr):
            logger.error("Failed to initialize memory pointers. Exiting.")
            show_error_popup_async("Critical memory pointers could not be found.\nPlease restart the script or wait for an update.")
            sys.exit(1)

        logger.info(f"Initialized. Max HP: {pm.read_int(max_hp_addr)}, Max MP: {pm.read_int(max_mp_addr)}")

    except Exception as e:
        logger.error(f"Initialization failed: {e}")
        show_error_popup_async("Initialization error. Please restart the game.")
        sys.exit(1)

async def start_script():
    init()
    await asyncio.gather(
        hp_routine(),
        mp_routine()
    )

# ------------------------------------------------------------
# Main Entry Point
# ------------------------------------------------------------

if __name__ == "__main__":
    try:
        asyncio.run(start_script())
    except KeyboardInterrupt:
        logger.info("Script terminated by user.")
        stop_event.set()
    except Exception as e:
        logger.error(f"Unexpected error: {e}", exc_info=True)
        show_error_popup_async(f"Unexpected error: {e}")
        sys.exit(1)
    finally:
        logger.info("Shutting down gracefully.")
    if root:
        try:
            root.quit()
            root.destroy()
        except Exception as e:
            logger.warning(f"Failed to destroy Tkinter root: {e}")
Editor is loading...
Leave a Comment