Untitled

 avatar
unknown
plain_text
5 months ago
3.3 kB
1
Indexable
import time
import pigpio
import paho.mqtt.client as mqtt

# Sensor setup
DHT_PIN = 4  # GPIO4

# MQTT setup
MQTT_BROKER = "localhost"  # Change if using an external broker
MQTT_TOPIC = "home/sensors/temperature"

client = mqtt.Client()

def connect_mqtt():
    try:
        client.connect(MQTT_BROKER, 1883, 60)
        print("Connected to MQTT Broker")
    except Exception as e:
        print(f"Failed to connect to MQTT Broker: {e}")
        return False
    return True

class DHT22:
    def __init__(self, pi, gpio):
        self.pi = pi
        self.gpio = gpio
        self.high_tick = 0
        self.bit = 40
        self.temperature = 0.0
        self.humidity = 0.0
        self.either_edge_cb = None
        self.setup()

    def setup(self):
        self.pi.set_mode(self.gpio, pigpio.INPUT)
        self.pi.set_pull_up_down(self.gpio, pigpio.PUD_OFF)
        self.either_edge_cb = self.pi.callback(self.gpio, pigpio.EITHER_EDGE, self.edge_callback)

    def edge_callback(self, gpio, level, tick):
        if level == 0:
            if self.high_tick != 0:
                t = pigpio.tickDiff(self.high_tick, tick)
                if 50 <= t <= 80:
                    self.bit = 40
                    self.temperature = 0.0
                    self.humidity = 0.0
                elif t < 180:
                    self.bit -= 1
                    if self.bit >= 32:
                        self.humidity = self.humidity << 1
                        if t > 110:
                            self.humidity += 1
                    elif 16 <= self.bit < 32:
                        self.temperature = self.temperature << 1
                        if t > 110:
                            self.temperature += 1
                    if self.bit == 0:
                        self.pi.set_watchdog(self.gpio, 0)
        else:
            self.high_tick = tick

    def read(self):
        self.pi.write(self.gpio, pigpio.LOW)
        time.sleep(0.017)  # 17 ms
        self.pi.set_mode(self.gpio, pigpio.INPUT)
        self.pi.set_watchdog(self.gpio, 200)
        time.sleep(0.2)
        if self.bit != 40:
            return None
        return self.temperature, self.humidity

def publish_sensor_data(dht22):
    try:
        result = dht22.read()
        if result is not None:
            temperature, humidity = result
            sensor_data = f"Temperature: {temperature:.1f}C Humidity: {humidity:.1f}%"
            print(sensor_data)
            # Publish the sensor data to the MQTT broker
            client.publish(MQTT_TOPIC, sensor_data)
        else:
            print("Failed to retrieve data from sensor")
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    if not connect_mqtt():
        exit(1)
    
    pi = pigpio.pi()
    if not pi.connected:
        print("Failed to connect to pigpio daemon")
        exit(1)

    dht22 = DHT22(pi, DHT_PIN)

    try:
        while True:
            publish_sensor_data(dht22)
            time.sleep(10)  # Delay between sensor reads
    except KeyboardInterrupt:
        print("Stopping the script")
    finally:
        dht22.either_edge_cb.cancel()
        pi.stop()
        client.disconnect()
Editor is loading...
Leave a Comment