Untitled
unknown
plain_text
a year ago
18 kB
16
Indexable
#!/usr/bin/env python3
"""
A complete Wumpus World Solver using a Unified Complexity Framework Agent.
This is a fully functional implementation to demonstrate the UCF principles
on a classic, agent-based, logical reasoning problem.
Author: Andrew Scott Gracey & Gemini
Copyright © 2025. All Rights Reserved.
"""
import numpy as np
import time
import os
import random
import traceback
from collections import deque
# --- Attempt to import the core UCF engine ---
try:
from ucf_implementation1WINNERtest300500winnertestpreprocesstimeseries15tabular1 import UnifiedComplexityFramework
print("✅ Successfully imported UCF engine.")
except ImportError:
print("❌ CRITICAL ERROR: Could not find 'ucf_implementation1WINNER...'.py")
print(" Please ensure this script is in the same directory as your UCF file.")
exit(1)
# --- The Wumpus World Environment ---
class WumpusWorld:
def __init__(self, size=4):
self.size = size
self.agent_pos = [0, 0]
self.agent_dir = 0 # 0:Right, 1:Up, 2:Left, 3:Down
self.agent_has_arrow = True
self.agent_has_gold = False
self.wumpus_is_alive = True
self.game_over = False
# Classic Wumpus World layout
self.world = np.zeros((size, size), dtype=int)
self.pit_pos = [(2, 0), (2, 2), (3, 3)]
self.wumpus_pos = (0, 2)
self.gold_pos = (1, 2)
for p in self.pit_pos:
self.world[p] = 1
self.world[self.wumpus_pos] = 2
self.world[self.gold_pos] = 3
print("--- Wumpus World Initialized (Classic Layout) ---")
print(f"Agent starts at: {self.agent_pos}")
print(f"Gold is at: {self.gold_pos}, Wumpus is at: {self.wumpus_pos}, Pits at: {self.pit_pos}")
print("-------------------------------------------------")
def get_percepts(self):
percepts = {'stench': False, 'breeze': False, 'glitter': False}
x, y = self.agent_pos
# Check for glitter (gold in current square)
if tuple(self.agent_pos) == self.gold_pos:
percepts['glitter'] = True
# Check adjacent squares for stench and breeze
for dx, dy in [(0,1), (0,-1), (1,0), (-1,0)]:
nx, ny = x + dx, y + dy
if 0 <= nx < self.size and 0 <= ny < self.size:
if self.world[nx, ny] == 2 and self.wumpus_is_alive:
percepts['stench'] = True
if self.world[nx, ny] == 1:
percepts['breeze'] = True
return percepts
def execute_action(self, action):
reward = -1 # Cost of each action
x, y = self.agent_pos
dirs = [(0, 1), (-1, 0), (0, -1), (1, 0)] # Right, Up, Left, Down
if action == 'move_forward':
dx, dy = dirs[self.agent_dir]
new_x, new_y = x + dx, y + dy
# Check bounds
if 0 <= new_x < self.size and 0 <= new_y < self.size:
self.agent_pos = [new_x, new_y]
elif action == 'turn_left':
self.agent_dir = (self.agent_dir + 1) % 4
elif action == 'turn_right':
self.agent_dir = (self.agent_dir - 1 + 4) % 4
elif action == 'grab':
if tuple(self.agent_pos) == self.gold_pos:
self.agent_has_gold = True
self.world[self.gold_pos] = 0 # Remove gold from world
print("🏆 Agent grabbed the Gold!")
elif action == 'shoot':
if self.agent_has_arrow:
self.agent_has_arrow = False
# Shoot in current direction
dx, dy = dirs[self.agent_dir]
shot_x, shot_y = x, y
# Arrow travels until it hits something or goes out of bounds
while True:
shot_x += dx
shot_y += dy
# Check if arrow went out of bounds
if not (0 <= shot_x < self.size and 0 <= shot_y < self.size):
break
# Check if arrow hit the Wumpus
if (shot_x, shot_y) == self.wumpus_pos and self.wumpus_is_alive:
self.wumpus_is_alive = False
reward += 100
print("😱 Agent hears a terrible scream! The Wumpus is dead.")
return reward, self.game_over, True # Return scream=True
elif action == 'climb':
if self.agent_pos == [0, 0]:
self.game_over = True
if self.agent_has_gold:
reward += 1000
print("🧗 Agent climbed out. FINAL STATUS: SUCCESS!")
else:
print("🧗 Agent climbed out. FINAL STATUS: SURVIVED (no gold)")
# Check for death conditions
current_pos = tuple(self.agent_pos)
if (self.wumpus_is_alive and current_pos == self.wumpus_pos) or self.world[current_pos] == 1:
reward = -1000
self.game_over = True
print(f"☠️ AGENT DIED at {current_pos}. GAME OVER.")
return reward, self.game_over, False
def print_state(self):
"""Print a visual representation of the true world state"""
grid = [[' ' for _ in range(self.size)] for _ in range(self.size)]
# Place pits
for r, c in self.pit_pos:
grid[r][c] = 'P'
# Place Wumpus (if alive)
if self.wumpus_is_alive:
grid[self.wumpus_pos[0]][self.wumpus_pos[1]] = 'W'
# Place Gold (if not taken)
if not self.agent_has_gold:
grid[self.gold_pos[0]][self.gold_pos[1]] = 'G'
# Place Agent
ax, ay = self.agent_pos
dirs = ['→', '↑', '←', '↓']
agent_char = f"A{dirs[self.agent_dir]}"
grid[ax][ay] = agent_char
# Print the grid
print("\n--- True World State ---")
for row in grid:
print('+----' * self.size + '+')
print('|' + '|'.join(cell.center(4) for cell in row) + '|')
print('+----' * self.size + '+')
# --- The UCF-Powered Intelligent Agent ---
class UCFWumpusAgent:
def __init__(self, size, ucf_engine):
self.size = size
self.ucf = ucf_engine
self.agent_pos = (0, 0)
self.agent_dir = 0
self.has_gold = False
self.has_arrow = True
self.wumpus_is_dead = False
# Knowledge representation: -1=Unknown, 0=Safe, 1=Pit-Risk, 2=Wumpus-Risk
self.world_knowledge = np.full((size, size), -1, dtype=int)
self.visited = set()
print(f"🤖 UCF Agent initialized. Ready to reason with complexity minimization.")
def get_state_profile(self, state_grid):
"""Use UCF to measure the complexity of a knowledge state"""
try:
complexity_value, _ = self.ucf.compute_complexity(state_grid, return_components=True)
return abs(complexity_value)
except Exception:
return float('inf') # Invalid states have infinite complexity
def update_knowledge(self, percepts):
"""Update knowledge with proper danger detection priority"""
x, y = self.agent_pos
self.visited.add((x, y))
self.world_knowledge[x, y] = 0 # Current square is definitely safe
adj_squares = self.get_adjacent_squares((x, y))
# PRIORITY 1: DETECT DANGERS FIRST (mark risky squares)
if percepts['stench'] and not self.wumpus_is_dead:
for nx, ny in adj_squares:
if self.world_knowledge[nx, ny] == -1: # Only mark unknown squares
self.world_knowledge[nx, ny] = 2 # Mark as Wumpus-risk
if percepts['breeze']:
for nx, ny in adj_squares:
if self.world_knowledge[nx, ny] == -1: # Only mark unknown squares
self.world_knowledge[nx, ny] = 1 # Mark as Pit-risk
# PRIORITY 2: DEDUCE SAFETY (but don't override danger markings)
if not percepts['stench'] and not self.wumpus_is_dead:
for nx, ny in adj_squares:
if self.world_knowledge[nx, ny] in [-1, 2]: # Unknown or Wumpus-risk only
self.world_knowledge[nx, ny] = 0 # Mark as safe from Wumpus
if not percepts['breeze']:
for nx, ny in adj_squares:
if self.world_knowledge[nx, ny] in [-1, 1]: # Unknown or Pit-risk only
if self.world_knowledge[nx, ny] != 2: # Don't override Wumpus-risk
self.world_knowledge[nx, ny] = 0 # Mark as safe from Pit
def decide_action(self, percepts):
"""The main reasoning engine using UCF complexity minimization"""
self.update_knowledge(percepts)
# PRIORITY 1: Check internal state first - if we have gold and are home, climb out
if self.has_gold and self.agent_pos == (0, 0):
return 'climb'
# PRIORITY 2: Goal-oriented behavior (return home with gold)
if self.has_gold:
print("REASONING: I have the gold. Finding safest path home.")
return self.navigate_home()
# PRIORITY 3: Grab gold if we see it and don't have it yet
if percepts['glitter'] and not self.has_gold:
return 'grab'
# PRIORITY 4: UCF-driven exploration
current_complexity = self.get_state_profile(self.world_knowledge)
print(f"ANALYSIS: My current worldview has complexity: {current_complexity:.4f}")
print("REASONING: Exploring to reduce uncertainty and find the gold...")
# Consider all possible actions
action_outcomes = []
possible_actions = ['move_forward', 'turn_left', 'turn_right']
for action in possible_actions:
imagined_complexity = self.imagine_action_outcome(action)
if imagined_complexity < float('inf'):
action_outcomes.append({
'action': action,
'complexity': imagined_complexity
})
print(f" - Imagining '{action}' -> complexity: {imagined_complexity:.4f}")
# If no safe moves, just turn
if not action_outcomes:
print(" ⚠️ No safe moves available. Turning to reassess.")
return 'turn_left'
# Choose the action that leads to the simplest worldview
best_outcome = min(action_outcomes, key=lambda x: x['complexity'])
best_action = best_outcome['action']
print(f"==> DECISION: Chose '{best_action}' (leads to simplest worldview)")
return best_action
def imagine_action_outcome(self, action):
"""Imagine the complexity of the world after taking an action"""
if action in ['turn_left', 'turn_right']:
# Turning doesn't change knowledge but has slight cost to prefer movement
return self.get_state_profile(self.world_knowledge) + 0.001
elif action == 'move_forward':
dirs = [(0, 1), (-1, 0), (0, -1), (1, 0)]
dx, dy = dirs[self.agent_dir]
next_pos = (self.agent_pos[0] + dx, self.agent_pos[1] + dy)
# Check if move is within bounds and safe
if not (0 <= next_pos[0] < self.size and 0 <= next_pos[1] < self.size):
return float('inf') # Can't move out of bounds
if self.world_knowledge[next_pos] != 0:
return float('inf') # Only move to known safe squares
# Create hypothetical world where we've explored the new square
hypothetical_world = self.world_knowledge.copy()
hypothetical_world[next_pos] = 0 # Mark as explored and safe
return self.get_state_profile(hypothetical_world)
return float('inf')
def navigate_home(self):
"""Find path back to (0,0) using only known safe squares"""
path = self.find_path_to_target((0, 0))
if path and len(path) > 1:
return self.get_action_for_next_step(path[1])
else:
# No safe path found, turn to look for alternatives
return 'turn_left'
def find_path_to_target(self, target_pos):
"""BFS pathfinding on known safe squares"""
if self.agent_pos == target_pos:
return [self.agent_pos]
queue = deque([[self.agent_pos]])
visited = {self.agent_pos}
while queue:
path = queue.popleft()
current_pos = path[-1]
for neighbor in self.get_adjacent_squares(current_pos):
if neighbor == target_pos:
return path + [neighbor]
if neighbor not in visited and self.world_knowledge[neighbor] == 0:
visited.add(neighbor)
new_path = path + [neighbor]
queue.append(new_path)
return None # No path found
def get_action_for_next_step(self, next_pos):
"""Convert next position into the required action"""
dx = next_pos[0] - self.agent_pos[0]
dy = next_pos[1] - self.agent_pos[1]
# Direction mappings
target_dirs = {(0, 1): 0, (-1, 0): 1, (0, -1): 2, (1, 0): 3}
required_dir = target_dirs.get((dx, dy))
if required_dir is None:
return 'turn_left' # Invalid move
if self.agent_dir == required_dir:
return 'move_forward'
else:
# Simple turning logic - could be optimized
return 'turn_left'
def update_agent_state(self, action):
"""Update internal state after action is executed"""
if action == 'turn_left':
self.agent_dir = (self.agent_dir + 1) % 4
elif action == 'turn_right':
self.agent_dir = (self.agent_dir - 1 + 4) % 4
elif action == 'move_forward':
dirs = [(0, 1), (-1, 0), (0, -1), (1, 0)]
dx, dy = dirs[self.agent_dir]
self.agent_pos = (self.agent_pos[0] + dx, self.agent_pos[1] + dy)
elif action == 'grab':
self.has_gold = True
elif action == 'shoot':
self.has_arrow = False
def get_adjacent_squares(self, pos):
"""Get all valid adjacent squares"""
x, y = pos
adjacent = []
for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
nx, ny = x + dx, y + dy
if 0 <= nx < self.size and 0 <= ny < self.size:
adjacent.append((nx, ny))
return adjacent
def print_knowledge(self):
"""Print the agent's mental map"""
mapping = {-1: '?', 0: 'S', 1: 'P?', 2: 'W?'}
print("--- Agent's Knowledge ---")
for r in range(self.size):
row_str = []
for c in range(self.size):
if (r, c) == self.agent_pos:
dirs = ['→', '↑', '←', '↓']
row_str.append(f"A{dirs[self.agent_dir]}")
else:
row_str.append(mapping.get(self.world_knowledge[r, c], 'X'))
print('|' + '|'.join(cell.center(4) for cell in row_str) + '|')
print("-------------------------")
# --- Main Game Loop ---
if __name__ == "__main__":
try:
# Initialize the world and agent
world = WumpusWorld(size=4)
ucf_engine = UnifiedComplexityFramework(domain='images')
agent = UCFWumpusAgent(size=4, ucf_engine=ucf_engine)
# Run the simulation
for step in range(1, 51): # Increased step limit
print(f"\n" + "="*15 + f" STEP {step} " + "="*15)
# Show current state
world.print_state()
agent.print_knowledge()
# Agent senses, thinks, and acts
percepts = world.get_percepts()
print(f"PERCEPTS at {agent.agent_pos}: {percepts}")
action = agent.decide_action(percepts)
print(f"==> AGENT'S FINAL ACTION: {action.upper()}")
# Execute action in the world
reward, game_over, scream = world.execute_action(action)
# Update agent's internal state
if scream:
agent.wumpus_is_dead = True
agent.update_agent_state(action)
# Check for game end
if game_over:
print("\n" + "="*10 + " SIMULATION ENDED " + "="*10)
world.print_state()
break
time.sleep(0.5) # Pause for readability
if not world.game_over:
print("\nSimulation ended: Agent did not complete the objective in time.")
except Exception as e:
print(f"\nAn error occurred in the simulation: {e}")
traceback.print_exc()Editor is loading...
Leave a Comment