Untitled

 avatar
unknown
plain_text
2 months ago
12 kB
4
Indexable
class TurntableController:
    def __init__(self):
        self.is_initialized = False

        # GPIO setup
        self.direction_pin = OutputDevice(20)
        self.pulse_pin = OutputDevice(21)
        self.hall_sensor = Button(18, pull_up=True)
        self.enable_plus = OutputDevice(23)    
        self.enable_minus = OutputDevice(24)   
        
        self.hardware = HardwareController()
        
        # Add tracking variables for auto-recalibration
        self.operation_count = 0
        self.OPERATIONS_BEFORE_CALIBRATION = 20
        self.CALIBRATION_DELAY = 30  # seconds
        self.recalibration_timer = None

        # Position tracking
        self.current_position = 0  # 0 = home position
        self.is_calibrated = False
        self.positions = {
            'bin1': 0,      # Home position (0 degrees)
            'bin2': 90,     # 90 degrees
            'bin3': 180,    # 180 degrees
            'bin4': 270     # 270 degrees
        }
        
        # Motor parameters from smooth code
        self.max_speed = 800      # Steps per second
        self.acceleration = 200   # More conservative acceleration
        self.min_pulse_width = 3e-6  # 3μs for TB6600
        self.enable_state = False
        self.steps_per_revolution = 1600
        
        # Movement queue and thread
        self.movement_queue = queue.Queue()
        self.movement_thread = threading.Thread(target=self._movement_worker, daemon=True)
        self.movement_thread.start()

        # Disable motor on startup
        self.disable_motor()

    def initialize_hardware(self):
        """Initialize hardware components"""
        self.direction_pin = OutputDevice(20)
        self.pulse_pin = OutputDevice(21)
        self.hall_sensor = Button(18, pull_up=True)
        self.hardware = HardwareController()
        self.calibrate()

    def enable_motor(self, state=True):
        """Enable the driver with proper polarity for TB6600"""
        self.enable_plus.value = not state  # ENA+ should be opposite of ENA-
        self.enable_minus.value = state
        self.enable_state = state
        time.sleep(0.01)  # Allow time for driver enable

    def disable_motor(self):
        """Disable motor power"""
        self.enable_motor(False)

    def _move_segment(self, steps, direction, start_delay, end_delay):
        """Core movement function with linear speed ramp - exact from smooth code"""
        if steps == 0:
            return

        delay_increment = (end_delay - start_delay) / steps
        current_delay = start_delay

        for _ in range(steps):
            self.pulse_pin.on()
            time.sleep(self.min_pulse_width)
            self.pulse_pin.off()
            time.sleep(max(current_delay, self.min_pulse_width))
            current_delay += delay_increment

    def _move_motor(self, steps, direction):
        """Move motor with smooth acceleration - exact from smooth code"""
        if not self.enable_state:
            self.enable_motor(True)
            
        self.direction_pin.value = direction

        # Simplified acceleration profile from smooth code
        accel_steps = min(steps//2, 200)  # Limit acceleration steps
        cruise_steps = steps - 2*accel_steps
        
        # Acceleration phase
        self._move_segment(accel_steps, direction, 0.002, 0.0005)
        
        # Cruise phase (if any steps remain)
        if cruise_steps > 0:
            self._move_segment(cruise_steps, direction, 0.0005, 0.0005)
        
        # Deceleration phase
        self._move_segment(accel_steps, direction, 0.0005, 0.002)
        
        # Update position tracking
        self.current_position += steps if direction else -steps

    def calibrate(self):
        """Find home position using hall sensor"""
        print("Calibrating turntable...")
        
        try:
            # Enable motor for calibration
            self.enable_motor(True)
            time.sleep(0.1)  # Short delay
            
            # If not at home, search for it
            self.direction_pin.value = True  # CW direction
            attempts = 0
            
            while not self.hall_sensor.is_pressed:
                self.pulse_pin.on()
                time.sleep(0.002)  # Slower movement for calibration
                self.pulse_pin.off()
                time.sleep(0.002)
                attempts += 1
                
                if attempts % 100 == 0:  # Print progress every 100 steps
                    print(f"Searching for home position... {attempts} steps")
            
            print("Home position found!")
            self.current_position = 0
            self.is_calibrated = True
            self.is_initialized = True

        finally:
            # Always disable motor after calibration
            time.sleep(0.1)  # Short delay before disable
            self.disable_motor()

    def _calculate_steps(self, target_position):
        """Calculate steps needed to reach target position"""
        # Calculate shortest path to target
        diff = target_position - self.current_position
        if abs(diff) > 180:
            if diff > 0:
                diff -= 360
            else:
                diff += 360
                
        # Add slight overcompensation for mechanical losses
        compensation_factor = 1.02  # 2% extra steps
        steps = int(abs(diff) * self.steps_per_revolution * compensation_factor / 360)
        direction = diff > 0
        
        return steps, direction

    def _movement_worker(self):
        """Worker thread to process movement queue"""
        while True:
            try:
                target_bin = self.movement_queue.get()
                if not self.is_calibrated:
                    self.calibrate()
                
                # Get target position in degrees
                target_position = self.positions[target_bin]
                
                # Calculate required movement
                steps, direction = self._calculate_steps(target_position)
                
                # Execute movement
                self._move_motor(steps, direction)
                
                # Update current position
                self.current_position = target_position
                print(f"Moved to {target_bin} at position {target_position}")
                
                # After movement is complete, rotate servo and measure fill level
                self.hardware.rotate_and_measure(target_bin)
                
                self.movement_queue.task_done()
                
            except Exception as e:
                print(f"Error in movement worker: {e}")
                continue

    def move_to_bin(self, bin_id):
        """Queue movement to specified bin"""
        if not bin_id in self.positions:
            print(f"Invalid bin ID: {bin_id}")
            return

        # Increment operation count
        self.operation_count += 1
        print(f"Operation count: {self.operation_count}")

        # Queue the movement
        self.movement_queue.put(bin_id)

        # Check if we need to schedule recalibration
        if self.operation_count >= self.OPERATIONS_BEFORE_CALIBRATION:
            print("Scheduling auto-recalibration")
            self._schedule_auto_recalibration()

    def move_to_bin_without_measure(self, bin_id):
        """Move to bin position without measuring"""
        if not bin_id in self.positions:
            print(f"Invalid bin ID: {bin_id}")
            return

        # Put only the movement in the queue, without measurement
        def movement_only():
            try:
                if not self.is_calibrated:
                    self.calibrate()
                
                target_position = self.positions[bin_id]
                steps, direction = self._calculate_steps(target_position)
                self._move_motor(steps, direction)
                self.current_position = target_position
                print(f"Moved to {bin_id} at position {target_position}")
                
            except Exception as e:
                print(f"Error in movement: {e}")
        
        self.movement_queue.put(movement_only)

    def move_to_home(self):
        """Move turntable to home position"""
        self.move_to_bin('bin1')  # bin1 is home position

    def cleanup(self):
        """Cleanup hardware resources"""
        try:
            self.hardware.cleanup()
        except:
            pass

    def _schedule_auto_recalibration(self):
        """Schedule automatic recalibration after delay"""
        def check_and_recalibrate():
            # Check if any movement is in progress
            if self.movement_queue.empty():
                try:
                    # Find root window to access language and dark mode
                    root = None
                    for instance in CircularProgress._instances:
                        if not instance._destroyed:
                            root = instance.winfo_toplevel()
                            break

                    if root:
                        # Create loading screen
                        loading_screen = LoadingScreen(
                            root,
                            message=TRANSLATIONS[root.LANGUAGE]['calibrating'],
                            dark_mode=root.DARK_MODE,
                            language=root.LANGUAGE
                        )

                        def perform_calibration():
                            try:
                                # Run calibration
                                self.calibrate()
                                # Reset operation count
                                self.operation_count = 0
                                root.after(0, loading_screen.destroy)
                            except Exception as e:
                                print(f"Error during auto-recalibration: {e}")
                                root.after(0, loading_screen.destroy)
                                if root:
                                    root.after(0, lambda: MessageDialog(
                                        root,
                                        TRANSLATIONS[root.LANGUAGE]['calibration_failed'],
                                        root.DARK_MODE
                                    ))

                        # Start calibration in separate thread
                        thread = threading.Thread(target=perform_calibration)
                        thread.daemon = True
                        thread.start()

                except Exception as e:
                    print(f"Error scheduling auto-recalibration: {e}")

        # Cancel any existing timer
        if self.recalibration_timer:
            try:
                for instance in CircularProgress._instances:
                    if not instance._destroyed:
                        instance.after_cancel(self.recalibration_timer)
                        break
            except:
                pass

        # Schedule new timer
        for instance in CircularProgress._instances:
            if not instance._destroyed:
                self.recalibration_timer = instance.after(
                    int(self.CALIBRATION_DELAY * 1000),
                    check_and_recalibrate
                )
                break
Editor is loading...
Leave a Comment