Untitled
unknown
plain_text
5 months ago
3.3 kB
1
Indexable
import time import pigpio import paho.mqtt.client as mqtt # Sensor setup DHT_PIN = 4 # GPIO4 # MQTT setup MQTT_BROKER = "localhost" # Change if using an external broker MQTT_TOPIC = "home/sensors/temperature" client = mqtt.Client() def connect_mqtt(): try: client.connect(MQTT_BROKER, 1883, 60) print("Connected to MQTT Broker") except Exception as e: print(f"Failed to connect to MQTT Broker: {e}") return False return True class DHT22: def __init__(self, pi, gpio): self.pi = pi self.gpio = gpio self.high_tick = 0 self.bit = 40 self.temperature = 0.0 self.humidity = 0.0 self.either_edge_cb = None self.setup() def setup(self): self.pi.set_mode(self.gpio, pigpio.INPUT) self.pi.set_pull_up_down(self.gpio, pigpio.PUD_OFF) self.either_edge_cb = self.pi.callback(self.gpio, pigpio.EITHER_EDGE, self.edge_callback) def edge_callback(self, gpio, level, tick): if level == 0: if self.high_tick != 0: t = pigpio.tickDiff(self.high_tick, tick) if 50 <= t <= 80: self.bit = 40 self.temperature = 0.0 self.humidity = 0.0 elif t < 180: self.bit -= 1 if self.bit >= 32: self.humidity = self.humidity << 1 if t > 110: self.humidity += 1 elif 16 <= self.bit < 32: self.temperature = self.temperature << 1 if t > 110: self.temperature += 1 if self.bit == 0: self.pi.set_watchdog(self.gpio, 0) else: self.high_tick = tick def read(self): self.pi.write(self.gpio, pigpio.LOW) time.sleep(0.017) # 17 ms self.pi.set_mode(self.gpio, pigpio.INPUT) self.pi.set_watchdog(self.gpio, 200) time.sleep(0.2) if self.bit != 40: return None return self.temperature, self.humidity def publish_sensor_data(dht22): try: result = dht22.read() if result is not None: temperature, humidity = result sensor_data = f"Temperature: {temperature:.1f}C Humidity: {humidity:.1f}%" print(sensor_data) # Publish the sensor data to the MQTT broker client.publish(MQTT_TOPIC, sensor_data) else: print("Failed to retrieve data from sensor") except Exception as e: print(f"An error occurred: {e}") if __name__ == "__main__": if not connect_mqtt(): exit(1) pi = pigpio.pi() if not pi.connected: print("Failed to connect to pigpio daemon") exit(1) dht22 = DHT22(pi, DHT_PIN) try: while True: publish_sensor_data(dht22) time.sleep(10) # Delay between sensor reads except KeyboardInterrupt: print("Stopping the script") finally: dht22.either_edge_cb.cancel() pi.stop() client.disconnect()
Editor is loading...
Leave a Comment