Untitled

 avatar
unknown
plain_text
a month ago
6.4 kB
3
Indexable
import time
import board
import busio
import adafruit_vl53l0x
from adafruit_tca9548a import TCA9548A
from adafruit_pca9685 import PCA9685
from adafruit_motor import servo
import statistics
import sys
import tty
import termios

# Initialize I2C buses
i2c_mux = busio.I2C(board.SCL, board.SDA)  # For multiplexer
i2c_servo = busio.I2C(board.SCL, board.SDA)  # For servo controller

# Initialize multiplexer
mux = TCA9548A(i2c_mux)

# Initialize PCA9685 directly on I2C bus (address 0x40)
pca = PCA9685(i2c_servo, address=0x40)
pca.frequency = 50  # Standard servo frequency

# Initialize servo
servo_motor = servo.Servo(pca.channels[0], min_pulse=1300, max_pulse=3600)
servo_motor.angle = 0  # Set to home position

# Create sensor objects on all 8 channels (0-7)
sensors = {
    channel: adafruit_vl53l0x.VL53L0X(mux[channel])
    for channel in range(8)
}

# Constants
MAX_DISTANCE = 2800
MIN_DISTANCE = 0
FULL_THRESHOLD = 50
MEASUREMENT_TIME = 1.5
SAMPLE_DELAY = 0.02

def print_separator():
    print("\n" + "-" * 40 + "\n")

def getch():
    """Get a single character from standard input"""
    fd = sys.stdin.fileno()
    old_settings = termios.tcgetattr(fd)
    try:
        tty.setraw(sys.stdin.fileno())
        ch = sys.stdin.read(1)
    finally:
        termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
    return ch

def get_measurements():
    """Collect readings from all sensors for the specified time period"""
    readings_per_sensor = {}
    end_time = time.time() + MEASUREMENT_TIME
    
    print("Taking measurements...")
    while time.time() < end_time:
        for sensor_id, sensor in sensors.items():
            try:
                reading = sensor.range
                if sensor_id not in readings_per_sensor:
                    readings_per_sensor[sensor_id] = []
                readings_per_sensor[sensor_id].append(reading)
            except Exception:
                continue
        time.sleep(SAMPLE_DELAY)
    
    # Print raw readings for each sensor
    print("\nRaw sensor readings (mm):")
    for sensor_id, readings in readings_per_sensor.items():
        if readings:
            avg_reading = sum(readings) / len(readings)
            print(f"Sensor {sensor_id}: {len(readings)} readings, avg: {avg_reading:.1f}mm")
        else:
            print(f"Sensor {sensor_id}: No valid readings")
    
    return readings_per_sensor

def process_readings(readings_per_sensor):
    """Process all collected readings and return the final percentage"""
    if not readings_per_sensor:
        return None, None
    
    # Process each sensor's readings
    sensor_medians = []
    print("\nProcessed readings:")
    
    for sensor_id, sensor_readings in readings_per_sensor.items():
        if sensor_readings:
            # Remove outliers: readings outside 1.5 standard deviations
            if len(sensor_readings) >= 3:
                mean = statistics.mean(sensor_readings)
                stdev = statistics.stdev(sensor_readings)
                filtered = [r for r in sensor_readings if (mean - 1.5 * stdev) <= r <= (mean + 1.5 * stdev)]
                if filtered:
                    median = statistics.median(filtered)
                    sensor_medians.append(median)
                    print(f"Sensor {sensor_id}: Median (after filtering): {median:.1f}mm")
            else:
                median = statistics.median(sensor_readings)
                sensor_medians.append(median)
                print(f"Sensor {sensor_id}: Median: {median:.1f}mm")
    
    if not sensor_medians:
        return None, None
        
    closest_reading = min(sensor_medians)
    median_distance = statistics.median(sensor_medians)
    percentage = ((MAX_DISTANCE - median_distance) / (MAX_DISTANCE - MIN_DISTANCE)) * 100
    percentage = max(0, min(100, percentage))
    
    print(f"\nCalculation details:")
    print(f"Closest reading: {closest_reading:.1f}mm")
    print(f"Median distance across all sensors: {median_distance:.1f}mm")
    print(f"Calculated percentage: {percentage:.1f}%")
    
    return closest_reading, percentage

def rotate_servo():
    """Rotate servo 90 degrees and back"""
    print("Rotating servo...")
    
    # Rotate to 90 degrees
    servo_motor.angle = 90
    print("Position: 90°")
    time.sleep(1)
    
    # Return to home position
    servo_motor.angle = 0
    print("Position: Home (0°)")
    
    print("Rotation complete")

def main():
    # Give sensors time to initialize
    time.sleep(1)
    print("\nFill Level Measurement System")
    print_separator()
    print("Controls:")
    print("'s' - Measure fill level")
    print("'m' - Move servo")
    print("'q' - Quit")
    print_separator()
    
    while True:
        char = getch()
        print_separator()
        
        if char.lower() == 'q':
            print("Shutting down...")
            # Return servo to home position before exit
            servo_motor.angle = 0
            print("Goodbye!")
            break
            
        elif char.lower() == 's':
            print("Starting measurement cycle...")
            readings = get_measurements()
            closest_reading, percentage = process_readings(readings)
            
            if closest_reading is None:
                print("Error: Could not get valid readings from sensors")
            else:
                print_separator()
                if closest_reading <= FULL_THRESHOLD:
                    print("Status: BIN FULL")
                else:
                    print(f"Final Fill level: {percentage:.1f}%")
                    
        elif char.lower() == 'm':
            rotate_servo()
            
        else:
            print("Invalid command!")
            
        print_separator()
        print("Ready for next command (s/m/q)")

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"\nAn error occurred: {str(e)}")
        # Ensure servo returns to home position on error
        try:
            servo_motor.angle = 0
        except:
            pass
    finally:
        # Cleanup
        print("\nCleaning up...")
        try:
            pca.deinit()
        except:
            pass
Leave a Comment