Untitled
unknown
plain_text
a year ago
3.3 kB
6
Indexable
import serial
import time
import paho.mqtt.client as mqtt
import json
import glob
import sys
# MQTT settings
MQTT_BROKER = "YOUR_MQTT_BROKER_ADDRESS"
MQTT_TOPIC = "photoresistor/data"
def find_arduino_port():
ports = glob.glob('/dev/ttyACM*') + glob.glob('/dev/ttyUSB*')
print(f"Available ports: {ports}")
if ports:
return ports[0]
return None
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print(f"Failed to connect, return code {rc}")
client = mqtt.Client()
client.on_connect = on_connect
try:
client.connect(MQTT_BROKER, 1883)
return client
except Exception as e:
print(f"Failed to connect to MQTT broker: {e}")
return None
def read_sensor_data(ser):
try:
line = ser.readline().decode('utf-8').strip()
print(f"Raw data: '{line}'")
if not line:
print("Received empty line from Arduino")
return None, None
if ',' not in line:
print(f"Unexpected data format: {line}")
return None, None
sensor_value, voltage = map(float, line.split(','))
return sensor_value, voltage
except ValueError as e:
print(f"Error parsing sensor data: {e}")
return None, None
except Exception as e:
print(f"Unexpected error reading sensor data: {e}")
return None, None
def main():
print("Starting Arduino-MQTT bridge...")
print(f"Python version: {sys.version}")
print(f"Pyserial version: {serial.__version__}")
arduino_port = find_arduino_port()
if not arduino_port:
print("Arduino not found. Please check the connection.")
return
print(f"Arduino found at {arduino_port}")
try:
ser = serial.Serial(arduino_port, 9600, timeout=5)
print("Serial connection established")
except serial.SerialException as e:
print(f"Failed to open serial port: {e}")
return
mqtt_client = connect_mqtt()
if not mqtt_client:
print("Failed to connect to MQTT broker. Exiting.")
return
mqtt_client.loop_start()
try:
while True:
sensor_value, voltage = read_sensor_data(ser)
if sensor_value is not None and voltage is not None:
print(f"Sensor Value: {sensor_value}, Voltage: {voltage}V")
# Publish to MQTT
msg = json.dumps({"sensor_value": sensor_value, "voltage": voltage})
result = mqtt_client.publish(MQTT_TOPIC, msg)
status = result[0]
if status == 0:
print(f"Sent `{msg}` to topic `{MQTT_TOPIC}`")
else:
print(f"Failed to send message to topic {MQTT_TOPIC}")
else:
print("Failed to read valid sensor data")
time.sleep(1)
except KeyboardInterrupt:
print("Exiting...")
except serial.SerialException as e:
print(f"Serial error: {e}")
finally:
ser.close()
mqtt_client.loop_stop()
if __name__ == "__main__":
main()Editor is loading...
Leave a Comment