Untitled

mail@pastecode.io avatar
unknown
plain_text
18 days ago
2.4 kB
5
Indexable
Never
import time
import VisionFive.gpio as GPIO
import serial
from sensirion_i2c_driver import I2cConnection
from sensirion_i2c_sht.sht3x import Sht3xI2cDevice
from sensirion_i2c_driver.linux_i2c_transceiver import LinuxI2cTransceiver
import paho.mqtt.client as mqtt

# Setup GPIO for LEDs
LED_RED = 40 
LED_GREEN = 38
LED_BLUE = 36

GPIO.setmode(GPIO.BOARD)
GPIO.setup(LED_RED, GPIO.OUT)
GPIO.setup(LED_GREEN, GPIO.OUT)
GPIO.setup(LED_BLUE, GPIO.OUT)

i2c_transceiver = LinuxI2cTransceiver('/dev/i2c-0')
sht3x = Sht3xI2cDevice(I2cConnection(i2c_transceiver))

# Setup UART
uart = serial.Serial('/dev/ttyS0', 19200)

# MQTT setup
BROKER = "broker.emqx.io"
PORT = 1883
TOPIC_TEMP = "sensor/temperature"

# MQTT callback functions
# Setup UART
uart = serial.Serial(
        port='/dev/ttyS0', 
        baudrate = 19200,
        parity=serial.PARITY_NONE,
        stopbits=serial.STOPBITS_ONE,
        bytesize=serial.EIGHTBITS,
                timeout=1
        )

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT Broker!")
    else:
        print(f"Failed to connect, return code {rc}")


def on_publish(client, userdata, mid):
    print(f"Message {mid} published.")

# Set up MQTT client
client = mqtt.Client()
client.on_connect = on_connect
client.on_publish = on_publish

client.connect(BROKER, PORT, 60)
client.loop_start()

try:
    while True:
        temperature, humidity = sht3x.single_shot_measurement()
        temp_value = temperature.degrees_celsius

        GPIO.output(LED_RED, GPIO.LOW)
        GPIO.output(LED_GREEN, GPIO.LOW)
        GPIO.output(LED_BLUE, GPIO.LOW)

        if temp_value > 80:
            GPIO.output(LED_RED, GPIO.HIGH)
 
            # Publish temperature to MQTT when it exceeds 80
            result_temp = client.publish(TOPIC_TEMP, "{:0.2f}".format(temp_value))
            if result_temp.rc == 0:
                print(f"Sent temperature to topic {TOPIC_TEMP}")
            else:
                print(f"Failed to send temperature to topic {TOPIC_TEMP}")

        elif temp_value < 40:
            GPIO.output(LED_BLUE, GPIO.HIGH)
        else:
            GPIO.output(LED_GREEN, GPIO.HIGH)

        # Send temperature value over UART
        uart.write("{:0.2f}\n".format(temp_value).encode())

        print("SUCCESS! {:0.2f} C, {:0.2f} %RH".format(
            temp_value,
            humidity.percent_rh))

        time.sleep(1.0)
Leave a Comment