Untitled
unknown
plain_text
a year ago
2.6 kB
21
Indexable
import time import VisionFive.gpio as GPIO from sensirion_i2c_driver import I2cConnection from sensirion_i2c_sht.sht3x import Sht3xI2cDevice from sensirion_i2c_driver.linux_i2c_transceiver import LinuxI2cTransceiver import paho.mqtt.client as mqtt # Setup GPIO for LEDs LED_RED = 40 LED_GREEN = 38 LED_BLUE = 36 SIGNAL_PIN = 35 GPIO.setmode(GPIO.BOARD) GPIO.setup(LED_RED, GPIO.OUT) GPIO.setup(LED_GREEN, GPIO.OUT) GPIO.setup(LED_BLUE, GPIO.OUT) GPIO.setup(SIGNAL_PIN, GPIO.OUT) i2c_transceiver = LinuxI2cTransceiver('/dev/i2c-0') sht3x = Sht3xI2cDevice(I2cConnection(i2c_transceiver)) # MQTT setup BROKER = "broker.emqx.io" PORT = 1883 TOPIC_TEMP = "sensor/temperature" TOPIC_HUMID = "sensor/humidity" # MQTT callback functions def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print(f"Failed to connect, return code {rc}") def on_publish(client, userdata, mid): print(f"Message {mid} published.") # Set up MQTT client client = mqtt.Client() client.on_connect = on_connect client.on_publish = on_publish client.connect(BROKER, PORT, 60) client.loop_start() try: while True: temperature, humidity = sht3x.single_shot_measurement() temp_value = temperature.degrees_celsius humid_value = humidity.percent_rh GPIO.output(LED_RED, GPIO.LOW) GPIO.output(LED_GREEN, GPIO.LOW) GPIO.output(LED_BLUE, GPIO.LOW) GPIO.output(SIGNAL_PIN, GPIO.LOW) if temp_value > 80: GPIO.output(LED_RED, GPIO.HIGH) elif temp_value < 40: GPIO.output(LED_BLUE, GPIO.HIGH) GPIO.output(SIGNAL_PIN, GPIO.HIGH) else: GPIO.output(LED_GREEN, GPIO.HIGH) print("SUCCESS! {:0.2f} C, {:0.2f} %RH".format( temp_value, humid_value)) # Publish temperature and humidity to MQTT result_temp = client.publish(TOPIC_TEMP, "{:0.2f}".format(temp_value)) result_humid = client.publish(TOPIC_HUMID, "{:0.2f}".format(humid_value)) if result_temp.rc == 0: print(f"Sent temperature to topic `{TOPIC_TEMP}`") else: print(f"Failed to send temperature to topic {TOPIC_TEMP}") if result_humid.rc == 0: print(f"Sent humidity to topic `{TOPIC_HUMID}`") else: print(f"Failed to send humidity to topic {TOPIC_HUMID}") time.sleep(1.0) finally: GPIO.cleanup() client.loop_stop() client.disconnect()
Editor is loading...
Leave a Comment