Untitled

 avatar
unknown
plain_text
a month ago
5.6 kB
4
Indexable
import time
import board
import busio
import adafruit_vl53l0x
from adafruit_tca9548a import TCA9548A
from adafruit_pca9685 import PCA9685
from adafruit_motor import servo
import statistics
import sys
import tty
import termios

# Initialize I2C buses
i2c_mux = busio.I2C(board.SCL, board.SDA)  # For multiplexer
i2c_servo = busio.I2C(board.SCL, board.SDA)  # For servo controller

# Initialize multiplexer
mux = TCA9548A(i2c_mux)

# Initialize PCA9685 directly on I2C bus (address 0x40)
pca = PCA9685(i2c_servo, address=0x40)
pca.frequency = 50  # Standard servo frequency

# Initialize servo
servo_motor = servo.Servo(pca.channels[0], min_pulse=1300, max_pulse=3600)
servo_motor.angle = 0  # Set to home position

# Create sensor objects on channels 1, 2, and 4-7
sensors = {
    1: adafruit_vl53l0x.VL53L0X(mux[1]),
    2: adafruit_vl53l0x.VL53L0X(mux[2]),
    4: adafruit_vl53l0x.VL53L0X(mux[4]),
    5: adafruit_vl53l0x.VL53L0X(mux[5]),
    6: adafruit_vl53l0x.VL53L0X(mux[6])
}

# Constants
MAX_DISTANCE = 2800
MIN_DISTANCE = 0
FULL_THRESHOLD = 50
MEASUREMENT_TIME = 1.5
SAMPLE_DELAY = 0.02

def print_separator():
    print("\n" + "-" * 40 + "\n")

def getch():
    """Get a single character from standard input"""
    fd = sys.stdin.fileno()
    old_settings = termios.tcgetattr(fd)
    try:
        tty.setraw(sys.stdin.fileno())
        ch = sys.stdin.read(1)
    finally:
        termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
    return ch

def get_measurements():
    """Collect readings from all sensors for the specified time period"""
    readings_per_sensor = {}
    end_time = time.time() + MEASUREMENT_TIME
    
    print("Taking measurements...")
    while time.time() < end_time:
        for sensor_id, sensor in sensors.items():
            try:
                reading = sensor.range
                if sensor_id not in readings_per_sensor:
                    readings_per_sensor[sensor_id] = []
                readings_per_sensor[sensor_id].append(reading)
            except Exception:
                continue
        time.sleep(SAMPLE_DELAY)
    
    return readings_per_sensor

def process_readings(readings_per_sensor):
    """Process all collected readings and return the final percentage"""
    if not readings_per_sensor:
        return None, None
    
    # Process each sensor's readings
    sensor_medians = []
    for sensor_readings in readings_per_sensor.values():
        if sensor_readings:
            # Remove outliers: readings outside 1.5 standard deviations
            if len(sensor_readings) >= 3:
                mean = statistics.mean(sensor_readings)
                stdev = statistics.stdev(sensor_readings)
                filtered = [r for r in sensor_readings if (mean - 1.5 * stdev) <= r <= (mean + 1.5 * stdev)]
                if filtered:
                    sensor_medians.append(statistics.median(filtered))
            else:
                sensor_medians.append(statistics.median(sensor_readings))
    
    if not sensor_medians:
        return None, None
        
    closest_reading = min(sensor_medians)
    median_distance = statistics.median(sensor_medians)
    percentage = ((MAX_DISTANCE - median_distance) / (MAX_DISTANCE - MIN_DISTANCE)) * 100
    percentage = max(0, min(100, percentage))
    
    return closest_reading, percentage

def rotate_servo():
    """Rotate servo 90 degrees and back"""
    print("Rotating servo...")
    
    # Rotate to 90 degrees
    servo_motor.angle = 90
    print("Position: 90°")
    time.sleep(1)
    
    # Return to home position
    servo_motor.angle = 0
    print("Position: Home (0°)")
    
    print("Rotation complete")

def main():
    # Give sensors time to initialize
    time.sleep(1)
    print("\nFill Level Measurement System")
    print_separator()
    print("Controls:")
    print("'s' - Measure fill level")
    print("'m' - Move servo")
    print("'q' - Quit")
    print_separator()
    
    while True:
        char = getch()
        print_separator()
        
        if char.lower() == 'q':
            print("Shutting down...")
            # Return servo to home position before exit
            servo_motor.angle = 0
            print("Goodbye!")
            break
            
        elif char.lower() == 's':
            print("Starting measurement cycle...")
            readings = get_measurements()
            closest_reading, percentage = process_readings(readings)
            
            if closest_reading is None:
                print("Error: Could not get valid readings from sensors")
            else:
                print_separator()
                if closest_reading <= FULL_THRESHOLD:
                    print("Status: BIN FULL")
                else:
                    print(f"Fill level: {percentage:.1f}%")
                    
        elif char.lower() == 'm':
            rotate_servo()
            
        else:
            print("Invalid command!")
            
        print_separator()
        print("Ready for next command (s/m/q)")

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"\nAn error occurred: {str(e)}")
        # Ensure servo returns to home position on error
        try:
            servo_motor.angle = 0
        except:
            pass
    finally:
        # Cleanup
        print("\nCleaning up...")
        try:
            pca.deinit()
        except:
            pass
Editor is loading...
Leave a Comment