Untitled
def _monitor_sensors(self): """Monitor sensors in a separate thread""" while not hasattr(self, '_destroyed'): if self.sensor_system: try: # Check if any beam is broken for i in range(4): if self.sensor_system.is_beam_broken(i): # Trigger scan process if cooldown allows current_time = time.time() if current_time - self.last_click_time >= self.click_cooldown: print(f"Beam {i} broken - triggering scan") # Schedule scan on main thread self.after(0, self.trigger_scan_from_sensor) break time.sleep(0.25) # Check every 250ms except Exception as e: print(f"Error in sensor monitoring: {e}") time.sleep(1) # Wait longer on error def trigger_scan_from_sensor(self): """Trigger scan process from sensor detection""" # Similar to on_press but without animation current_time = time.time() if current_time - self.last_click_time >= self.click_cooldown: self.last_click_time = current_time # Check if turntable is initialized if not self.turntable or not self.turntable.is_initialized: print("Turntable not available or not initialized") return if not self.classifier: print(TRANSLATIONS[self.winfo_toplevel().LANGUAGE]['camera_error']) return try: print("Sensor triggered - starting classification") # Create loading screen loading_screen = LoadingScreen( self.winfo_toplevel(), message=TRANSLATIONS[self.winfo_toplevel().LANGUAGE]['scanning_object'], dark_mode=self.winfo_toplevel().DARK_MODE, language=self.winfo_toplevel().LANGUAGE ) # Use existing classification process def perform_classification(): try: def classification_thread(): try: image_path = self.classifier.take_picture() result = self.classifier.classify_image(image_path) bin_id = self.bin_config.get_bin_id_by_name(result) if bin_id: self.turntable.move_to_bin(bin_id) self.after(0, lambda: handle_classification_complete(result)) except Exception as e: print(f"Error during classification: {str(e)}") self.after(0, loading_screen.destroy) self.after(0, lambda: MessageDialog( self.winfo_toplevel(), TRANSLATIONS[self.winfo_toplevel().LANGUAGE]['classification_error'], self.winfo_toplevel().DARK_MODE )) def handle_classification_complete(result): loading_screen.destroy() ClassificationResultScreen( self.winfo_toplevel(), result, self.winfo_toplevel().DARK_MODE, self.winfo_toplevel().LANGUAGE ) thread = threading.Thread(target=classification_thread) thread.daemon = True thread.start() except Exception as e: print(f"Error setting up classification: {str(e)}") loading_screen.destroy() MessageDialog( self.winfo_toplevel(), TRANSLATIONS[self.winfo_toplevel().LANGUAGE]['classification_error'], self.winfo_toplevel().DARK_MODE ) self.after(100, perform_classification) except Exception as e: print(f"Error setting up classification: {str(e)}")
Leave a Comment