Untitled

 avatar
unknown
plain_text
4 months ago
3.8 kB
2
Indexable
import time
import board
import busio
import adafruit_vl53l0x
from adafruit_tca9548a import TCA9548A
import statistics
import sys
import tty
import termios

# Initialize I2C bus
i2c = busio.I2C(board.SCL, board.SDA)
mux = TCA9548A(i2c)

# Create sensor objects on channels 1, 2, and 4-7
sensors = {
    1: adafruit_vl53l0x.VL53L0X(mux[1]),
    2: adafruit_vl53l0x.VL53L0X(mux[2]),
    4: adafruit_vl53l0x.VL53L0X(mux[4]),
    5: adafruit_vl53l0x.VL53L0X(mux[5]),
    6: adafruit_vl53l0x.VL53L0X(mux[6]),
    7: adafruit_vl53l0x.VL53L0X(mux[7])
}

# Constants
MAX_DISTANCE = 400
MIN_DISTANCE = 0
FULL_THRESHOLD = 50
MEASUREMENT_TIME = 1.5  # Total time to collect readings in seconds
SAMPLE_DELAY = 0.02    # 20ms between reading cycles

def getch():
    """Get a single character from standard input"""
    fd = sys.stdin.fileno()
    old_settings = termios.tcgetattr(fd)
    try:
        tty.setraw(sys.stdin.fileno())
        ch = sys.stdin.read(1)
    finally:
        termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
    return ch

def get_measurements():
    """Collect readings from all sensors for the specified time period"""
    readings_per_sensor = {}
    end_time = time.time() + MEASUREMENT_TIME
    
    while time.time() < end_time:
        for sensor_id, sensor in sensors.items():
            try:
                reading = sensor.range
                if sensor_id not in readings_per_sensor:
                    readings_per_sensor[sensor_id] = []
                readings_per_sensor[sensor_id].append(reading)
            except Exception:
                continue
        time.sleep(SAMPLE_DELAY)
    
    return readings_per_sensor

def process_readings(readings_per_sensor):
    """Process all collected readings and return the final percentage"""
    if not readings_per_sensor:
        return None, None
    
    # Process each sensor's readings
    sensor_medians = []
    for sensor_readings in readings_per_sensor.values():
        if sensor_readings:
            # Remove outliers: readings outside 1.5 standard deviations
            if len(sensor_readings) >= 3:
                mean = statistics.mean(sensor_readings)
                stdev = statistics.stdev(sensor_readings)
                filtered = [r for r in sensor_readings if (mean - 1.5 * stdev) <= r <= (mean + 1.5 * stdev)]
                if filtered:
                    sensor_medians.append(statistics.median(filtered))
            else:
                sensor_medians.append(statistics.median(sensor_readings))
    
    if not sensor_medians:
        return None, None
        
    closest_reading = min(sensor_medians)
    median_distance = statistics.median(sensor_medians)
    percentage = ((MAX_DISTANCE - median_distance) / (MAX_DISTANCE - MIN_DISTANCE)) * 100
    percentage = max(0, min(100, percentage))  # Clamp between 0-100%
    
    return closest_reading, percentage

def main():
    # Give sensors time to initialize
    time.sleep(1)
    print("Press any key to measure, 'q' to quit")
    
    while True:
        char = getch()
        if char.lower() == 'q':
            print("\nExiting...")
            break
        
        readings = get_measurements()
        closest_reading, percentage = process_readings(readings)
        
        if closest_reading is None:
            print("Error: Could not get valid readings from sensors")
            continue
        
        if closest_reading <= FULL_THRESHOLD:
            print("Bin full")
        else:
            print(f"Fill level: {percentage:.1f}%")

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"\nAn error occurred: {str(e)}")
Editor is loading...
Leave a Comment