Untitled
unknown
plain_text
6 months ago
3.3 kB
2
Indexable
import serial import time import paho.mqtt.client as mqtt import json import glob import sys # MQTT settings MQTT_BROKER = "YOUR_MQTT_BROKER_ADDRESS" MQTT_TOPIC = "photoresistor/data" def find_arduino_port(): ports = glob.glob('/dev/ttyACM*') + glob.glob('/dev/ttyUSB*') print(f"Available ports: {ports}") if ports: return ports[0] return None def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print(f"Failed to connect, return code {rc}") client = mqtt.Client() client.on_connect = on_connect try: client.connect(MQTT_BROKER, 1883) return client except Exception as e: print(f"Failed to connect to MQTT broker: {e}") return None def read_sensor_data(ser): try: line = ser.readline().decode('utf-8').strip() print(f"Raw data: '{line}'") if not line: print("Received empty line from Arduino") return None, None if ',' not in line: print(f"Unexpected data format: {line}") return None, None sensor_value, voltage = map(float, line.split(',')) return sensor_value, voltage except ValueError as e: print(f"Error parsing sensor data: {e}") return None, None except Exception as e: print(f"Unexpected error reading sensor data: {e}") return None, None def main(): print("Starting Arduino-MQTT bridge...") print(f"Python version: {sys.version}") print(f"Pyserial version: {serial.__version__}") arduino_port = find_arduino_port() if not arduino_port: print("Arduino not found. Please check the connection.") return print(f"Arduino found at {arduino_port}") try: ser = serial.Serial(arduino_port, 9600, timeout=5) print("Serial connection established") except serial.SerialException as e: print(f"Failed to open serial port: {e}") return mqtt_client = connect_mqtt() if not mqtt_client: print("Failed to connect to MQTT broker. Exiting.") return mqtt_client.loop_start() try: while True: sensor_value, voltage = read_sensor_data(ser) if sensor_value is not None and voltage is not None: print(f"Sensor Value: {sensor_value}, Voltage: {voltage}V") # Publish to MQTT msg = json.dumps({"sensor_value": sensor_value, "voltage": voltage}) result = mqtt_client.publish(MQTT_TOPIC, msg) status = result[0] if status == 0: print(f"Sent `{msg}` to topic `{MQTT_TOPIC}`") else: print(f"Failed to send message to topic {MQTT_TOPIC}") else: print("Failed to read valid sensor data") time.sleep(1) except KeyboardInterrupt: print("Exiting...") except serial.SerialException as e: print(f"Serial error: {e}") finally: ser.close() mqtt_client.loop_stop() if __name__ == "__main__": main()
Editor is loading...
Leave a Comment