Untitled

 avatar
unknown
plain_text
a year ago
18 kB
16
Indexable
#!/usr/bin/env python3
"""
A complete Wumpus World Solver using a Unified Complexity Framework Agent.
This is a fully functional implementation to demonstrate the UCF principles
on a classic, agent-based, logical reasoning problem.

Author: Andrew Scott Gracey & Gemini
Copyright © 2025. All Rights Reserved.
"""

import numpy as np
import time
import os
import random
import traceback
from collections import deque

# --- Attempt to import the core UCF engine ---
try:
    from ucf_implementation1WINNERtest300500winnertestpreprocesstimeseries15tabular1 import UnifiedComplexityFramework
    print("✅ Successfully imported UCF engine.")
except ImportError:
    print("❌ CRITICAL ERROR: Could not find 'ucf_implementation1WINNER...'.py")
    print("   Please ensure this script is in the same directory as your UCF file.")
    exit(1)

# --- The Wumpus World Environment ---
class WumpusWorld:
    def __init__(self, size=4):
        self.size = size
        self.agent_pos = [0, 0]
        self.agent_dir = 0  # 0:Right, 1:Up, 2:Left, 3:Down
        self.agent_has_arrow = True
        self.agent_has_gold = False
        self.wumpus_is_alive = True
        self.game_over = False

        # Classic Wumpus World layout
        self.world = np.zeros((size, size), dtype=int)
        self.pit_pos = [(2, 0), (2, 2), (3, 3)]
        self.wumpus_pos = (0, 2)
        self.gold_pos = (1, 2)
        
        for p in self.pit_pos: 
            self.world[p] = 1
        self.world[self.wumpus_pos] = 2
        self.world[self.gold_pos] = 3

        print("--- Wumpus World Initialized (Classic Layout) ---")
        print(f"Agent starts at: {self.agent_pos}")
        print(f"Gold is at: {self.gold_pos}, Wumpus is at: {self.wumpus_pos}, Pits at: {self.pit_pos}")
        print("-------------------------------------------------")

    def get_percepts(self):
        percepts = {'stench': False, 'breeze': False, 'glitter': False}
        x, y = self.agent_pos
        
        # Check for glitter (gold in current square)
        if tuple(self.agent_pos) == self.gold_pos:
            percepts['glitter'] = True
        
        # Check adjacent squares for stench and breeze
        for dx, dy in [(0,1), (0,-1), (1,0), (-1,0)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.size and 0 <= ny < self.size:
                if self.world[nx, ny] == 2 and self.wumpus_is_alive:
                    percepts['stench'] = True
                if self.world[nx, ny] == 1:
                    percepts['breeze'] = True
        return percepts

    def execute_action(self, action):
        reward = -1  # Cost of each action
        x, y = self.agent_pos
        dirs = [(0, 1), (-1, 0), (0, -1), (1, 0)]  # Right, Up, Left, Down

        if action == 'move_forward':
            dx, dy = dirs[self.agent_dir]
            new_x, new_y = x + dx, y + dy
            # Check bounds
            if 0 <= new_x < self.size and 0 <= new_y < self.size:
                self.agent_pos = [new_x, new_y]
            
        elif action == 'turn_left':
            self.agent_dir = (self.agent_dir + 1) % 4
            
        elif action == 'turn_right':
            self.agent_dir = (self.agent_dir - 1 + 4) % 4
            
        elif action == 'grab':
            if tuple(self.agent_pos) == self.gold_pos:
                self.agent_has_gold = True
                self.world[self.gold_pos] = 0  # Remove gold from world
                print("🏆 Agent grabbed the Gold!")
                
        elif action == 'shoot':
            if self.agent_has_arrow:
                self.agent_has_arrow = False
                # Shoot in current direction
                dx, dy = dirs[self.agent_dir]
                shot_x, shot_y = x, y
                
                # Arrow travels until it hits something or goes out of bounds
                while True:
                    shot_x += dx
                    shot_y += dy
                    
                    # Check if arrow went out of bounds
                    if not (0 <= shot_x < self.size and 0 <= shot_y < self.size):
                        break
                        
                    # Check if arrow hit the Wumpus
                    if (shot_x, shot_y) == self.wumpus_pos and self.wumpus_is_alive:
                        self.wumpus_is_alive = False
                        reward += 100
                        print("😱 Agent hears a terrible scream! The Wumpus is dead.")
                        return reward, self.game_over, True  # Return scream=True
                        
        elif action == 'climb':
            if self.agent_pos == [0, 0]:
                self.game_over = True
                if self.agent_has_gold:
                    reward += 1000
                    print("🧗 Agent climbed out. FINAL STATUS: SUCCESS!")
                else:
                    print("🧗 Agent climbed out. FINAL STATUS: SURVIVED (no gold)")

        # Check for death conditions
        current_pos = tuple(self.agent_pos)
        if (self.wumpus_is_alive and current_pos == self.wumpus_pos) or self.world[current_pos] == 1:
            reward = -1000
            self.game_over = True
            print(f"☠️ AGENT DIED at {current_pos}. GAME OVER.")
            
        return reward, self.game_over, False

    def print_state(self):
        """Print a visual representation of the true world state"""
        grid = [[' ' for _ in range(self.size)] for _ in range(self.size)]
        
        # Place pits
        for r, c in self.pit_pos:
            grid[r][c] = 'P'
            
        # Place Wumpus (if alive)
        if self.wumpus_is_alive:
            grid[self.wumpus_pos[0]][self.wumpus_pos[1]] = 'W'
            
        # Place Gold (if not taken)
        if not self.agent_has_gold:
            grid[self.gold_pos[0]][self.gold_pos[1]] = 'G'
        
        # Place Agent
        ax, ay = self.agent_pos
        dirs = ['→', '↑', '←', '↓']
        agent_char = f"A{dirs[self.agent_dir]}"
        grid[ax][ay] = agent_char
        
        # Print the grid
        print("\n--- True World State ---")
        for row in grid:
            print('+----' * self.size + '+')
            print('|' + '|'.join(cell.center(4) for cell in row) + '|')
        print('+----' * self.size + '+')


# --- The UCF-Powered Intelligent Agent ---
class UCFWumpusAgent:
    def __init__(self, size, ucf_engine):
        self.size = size
        self.ucf = ucf_engine
        self.agent_pos = (0, 0)
        self.agent_dir = 0
        self.has_gold = False
        self.has_arrow = True
        self.wumpus_is_dead = False
        
        # Knowledge representation: -1=Unknown, 0=Safe, 1=Pit-Risk, 2=Wumpus-Risk
        self.world_knowledge = np.full((size, size), -1, dtype=int)
        self.visited = set()
        
        print(f"🤖 UCF Agent initialized. Ready to reason with complexity minimization.")

    def get_state_profile(self, state_grid):
        """Use UCF to measure the complexity of a knowledge state"""
        try:
            complexity_value, _ = self.ucf.compute_complexity(state_grid, return_components=True)
            return abs(complexity_value)
        except Exception:
            return float('inf')  # Invalid states have infinite complexity

    def update_knowledge(self, percepts):
        """Update knowledge with proper danger detection priority"""
        x, y = self.agent_pos
        self.visited.add((x, y))
        self.world_knowledge[x, y] = 0  # Current square is definitely safe
        
        adj_squares = self.get_adjacent_squares((x, y))
        
        # PRIORITY 1: DETECT DANGERS FIRST (mark risky squares)
        if percepts['stench'] and not self.wumpus_is_dead:
            for nx, ny in adj_squares:
                if self.world_knowledge[nx, ny] == -1:  # Only mark unknown squares
                    self.world_knowledge[nx, ny] = 2  # Mark as Wumpus-risk
                    
        if percepts['breeze']:
            for nx, ny in adj_squares:
                if self.world_knowledge[nx, ny] == -1:  # Only mark unknown squares
                    self.world_knowledge[nx, ny] = 1  # Mark as Pit-risk
        
        # PRIORITY 2: DEDUCE SAFETY (but don't override danger markings)
        if not percepts['stench'] and not self.wumpus_is_dead:
            for nx, ny in adj_squares:
                if self.world_knowledge[nx, ny] in [-1, 2]:  # Unknown or Wumpus-risk only
                    self.world_knowledge[nx, ny] = 0  # Mark as safe from Wumpus
                    
        if not percepts['breeze']:
            for nx, ny in adj_squares:
                if self.world_knowledge[nx, ny] in [-1, 1]:  # Unknown or Pit-risk only
                    if self.world_knowledge[nx, ny] != 2:  # Don't override Wumpus-risk
                        self.world_knowledge[nx, ny] = 0  # Mark as safe from Pit

    def decide_action(self, percepts):
        """The main reasoning engine using UCF complexity minimization"""
        self.update_knowledge(percepts)
        
        # PRIORITY 1: Check internal state first - if we have gold and are home, climb out
        if self.has_gold and self.agent_pos == (0, 0):
            return 'climb'
        
        # PRIORITY 2: Goal-oriented behavior (return home with gold)  
        if self.has_gold:
            print("REASONING: I have the gold. Finding safest path home.")
            return self.navigate_home()
        
        # PRIORITY 3: Grab gold if we see it and don't have it yet
        if percepts['glitter'] and not self.has_gold:
            return 'grab'
        
        # PRIORITY 4: UCF-driven exploration
        current_complexity = self.get_state_profile(self.world_knowledge)
        print(f"ANALYSIS: My current worldview has complexity: {current_complexity:.4f}")
        print("REASONING: Exploring to reduce uncertainty and find the gold...")
        
        # Consider all possible actions
        action_outcomes = []
        possible_actions = ['move_forward', 'turn_left', 'turn_right']
        
        for action in possible_actions:
            imagined_complexity = self.imagine_action_outcome(action)
            if imagined_complexity < float('inf'):
                action_outcomes.append({
                    'action': action, 
                    'complexity': imagined_complexity
                })
                print(f"  - Imagining '{action}' -> complexity: {imagined_complexity:.4f}")
        
        # If no safe moves, just turn
        if not action_outcomes:
            print("  ⚠️ No safe moves available. Turning to reassess.")
            return 'turn_left'
        
        # Choose the action that leads to the simplest worldview
        best_outcome = min(action_outcomes, key=lambda x: x['complexity'])
        best_action = best_outcome['action']
        
        print(f"==> DECISION: Chose '{best_action}' (leads to simplest worldview)")
        return best_action

    def imagine_action_outcome(self, action):
        """Imagine the complexity of the world after taking an action"""
        if action in ['turn_left', 'turn_right']:
            # Turning doesn't change knowledge but has slight cost to prefer movement
            return self.get_state_profile(self.world_knowledge) + 0.001
        
        elif action == 'move_forward':
            dirs = [(0, 1), (-1, 0), (0, -1), (1, 0)]
            dx, dy = dirs[self.agent_dir]
            next_pos = (self.agent_pos[0] + dx, self.agent_pos[1] + dy)
            
            # Check if move is within bounds and safe
            if not (0 <= next_pos[0] < self.size and 0 <= next_pos[1] < self.size):
                return float('inf')  # Can't move out of bounds
                
            if self.world_knowledge[next_pos] != 0:
                return float('inf')  # Only move to known safe squares
            
            # Create hypothetical world where we've explored the new square
            hypothetical_world = self.world_knowledge.copy()
            hypothetical_world[next_pos] = 0  # Mark as explored and safe
            return self.get_state_profile(hypothetical_world)
        
        return float('inf')

    def navigate_home(self):
        """Find path back to (0,0) using only known safe squares"""
        path = self.find_path_to_target((0, 0))
        if path and len(path) > 1:
            return self.get_action_for_next_step(path[1])
        else:
            # No safe path found, turn to look for alternatives
            return 'turn_left'

    def find_path_to_target(self, target_pos):
        """BFS pathfinding on known safe squares"""
        if self.agent_pos == target_pos:
            return [self.agent_pos]
        
        queue = deque([[self.agent_pos]])
        visited = {self.agent_pos}
        
        while queue:
            path = queue.popleft()
            current_pos = path[-1]
            
            for neighbor in self.get_adjacent_squares(current_pos):
                if neighbor == target_pos:
                    return path + [neighbor]
                    
                if neighbor not in visited and self.world_knowledge[neighbor] == 0:
                    visited.add(neighbor)
                    new_path = path + [neighbor]
                    queue.append(new_path)
        
        return None  # No path found

    def get_action_for_next_step(self, next_pos):
        """Convert next position into the required action"""
        dx = next_pos[0] - self.agent_pos[0]
        dy = next_pos[1] - self.agent_pos[1]
        
        # Direction mappings
        target_dirs = {(0, 1): 0, (-1, 0): 1, (0, -1): 2, (1, 0): 3}
        required_dir = target_dirs.get((dx, dy))
        
        if required_dir is None:
            return 'turn_left'  # Invalid move
            
        if self.agent_dir == required_dir:
            return 'move_forward'
        else:
            # Simple turning logic - could be optimized
            return 'turn_left'

    def update_agent_state(self, action):
        """Update internal state after action is executed"""
        if action == 'turn_left':
            self.agent_dir = (self.agent_dir + 1) % 4
        elif action == 'turn_right':
            self.agent_dir = (self.agent_dir - 1 + 4) % 4
        elif action == 'move_forward':
            dirs = [(0, 1), (-1, 0), (0, -1), (1, 0)]
            dx, dy = dirs[self.agent_dir]
            self.agent_pos = (self.agent_pos[0] + dx, self.agent_pos[1] + dy)
        elif action == 'grab':
            self.has_gold = True
        elif action == 'shoot':
            self.has_arrow = False

    def get_adjacent_squares(self, pos):
        """Get all valid adjacent squares"""
        x, y = pos
        adjacent = []
        for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.size and 0 <= ny < self.size:
                adjacent.append((nx, ny))
        return adjacent

    def print_knowledge(self):
        """Print the agent's mental map"""
        mapping = {-1: '?', 0: 'S', 1: 'P?', 2: 'W?'}
        print("--- Agent's Knowledge ---")
        for r in range(self.size):
            row_str = []
            for c in range(self.size):
                if (r, c) == self.agent_pos:
                    dirs = ['→', '↑', '←', '↓']
                    row_str.append(f"A{dirs[self.agent_dir]}")
                else:
                    row_str.append(mapping.get(self.world_knowledge[r, c], 'X'))
            print('|' + '|'.join(cell.center(4) for cell in row_str) + '|')
        print("-------------------------")


# --- Main Game Loop ---
if __name__ == "__main__":
    try:
        # Initialize the world and agent
        world = WumpusWorld(size=4)
        ucf_engine = UnifiedComplexityFramework(domain='images')
        agent = UCFWumpusAgent(size=4, ucf_engine=ucf_engine)

        # Run the simulation
        for step in range(1, 51):  # Increased step limit
            print(f"\n" + "="*15 + f" STEP {step} " + "="*15)
            
            # Show current state
            world.print_state()
            agent.print_knowledge()
            
            # Agent senses, thinks, and acts
            percepts = world.get_percepts()
            print(f"PERCEPTS at {agent.agent_pos}: {percepts}")
            
            action = agent.decide_action(percepts)
            print(f"==> AGENT'S FINAL ACTION: {action.upper()}")

            # Execute action in the world
            reward, game_over, scream = world.execute_action(action)
            
            # Update agent's internal state
            if scream:
                agent.wumpus_is_dead = True
            agent.update_agent_state(action)
            
            # Check for game end
            if game_over:
                print("\n" + "="*10 + " SIMULATION ENDED " + "="*10)
                world.print_state()
                break
                
            time.sleep(0.5)  # Pause for readability

        if not world.game_over:
            print("\nSimulation ended: Agent did not complete the objective in time.")

    except Exception as e:
        print(f"\nAn error occurred in the simulation: {e}")
        traceback.print_exc()
Editor is loading...
Leave a Comment