Untitled
unknown
plain_text
a year ago
2.6 kB
24
Indexable
import time
import VisionFive.gpio as GPIO
from sensirion_i2c_driver import I2cConnection
from sensirion_i2c_sht.sht3x import Sht3xI2cDevice
from sensirion_i2c_driver.linux_i2c_transceiver import LinuxI2cTransceiver
import paho.mqtt.client as mqtt
# Setup GPIO for LEDs
LED_RED = 40
LED_GREEN = 38
LED_BLUE = 36
SIGNAL_PIN = 35
GPIO.setmode(GPIO.BOARD)
GPIO.setup(LED_RED, GPIO.OUT)
GPIO.setup(LED_GREEN, GPIO.OUT)
GPIO.setup(LED_BLUE, GPIO.OUT)
GPIO.setup(SIGNAL_PIN, GPIO.OUT)
i2c_transceiver = LinuxI2cTransceiver('/dev/i2c-0')
sht3x = Sht3xI2cDevice(I2cConnection(i2c_transceiver))
# MQTT setup
BROKER = "broker.emqx.io"
PORT = 1883
TOPIC_TEMP = "sensor/temperature"
TOPIC_HUMID = "sensor/humidity"
# MQTT callback functions
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print(f"Failed to connect, return code {rc}")
def on_publish(client, userdata, mid):
print(f"Message {mid} published.")
# Set up MQTT client
client = mqtt.Client()
client.on_connect = on_connect
client.on_publish = on_publish
client.connect(BROKER, PORT, 60)
client.loop_start()
try:
while True:
temperature, humidity = sht3x.single_shot_measurement()
temp_value = temperature.degrees_celsius
humid_value = humidity.percent_rh
GPIO.output(LED_RED, GPIO.LOW)
GPIO.output(LED_GREEN, GPIO.LOW)
GPIO.output(LED_BLUE, GPIO.LOW)
GPIO.output(SIGNAL_PIN, GPIO.LOW)
if temp_value > 80:
GPIO.output(LED_RED, GPIO.HIGH)
elif temp_value < 40:
GPIO.output(LED_BLUE, GPIO.HIGH)
GPIO.output(SIGNAL_PIN, GPIO.HIGH)
else:
GPIO.output(LED_GREEN, GPIO.HIGH)
print("SUCCESS! {:0.2f} C, {:0.2f} %RH".format(
temp_value,
humid_value))
# Publish temperature and humidity to MQTT
result_temp = client.publish(TOPIC_TEMP, "{:0.2f}".format(temp_value))
result_humid = client.publish(TOPIC_HUMID, "{:0.2f}".format(humid_value))
if result_temp.rc == 0:
print(f"Sent temperature to topic `{TOPIC_TEMP}`")
else:
print(f"Failed to send temperature to topic {TOPIC_TEMP}")
if result_humid.rc == 0:
print(f"Sent humidity to topic `{TOPIC_HUMID}`")
else:
print(f"Failed to send humidity to topic {TOPIC_HUMID}")
time.sleep(1.0)
finally:
GPIO.cleanup()
client.loop_stop()
client.disconnect()
Editor is loading...
Leave a Comment