Untitled
unknown
plain_text
a year ago
3.3 kB
5
Indexable
import time
import pigpio
import paho.mqtt.client as mqtt
# Sensor setup
DHT_PIN = 4 # GPIO4
# MQTT setup
MQTT_BROKER = "localhost" # Change if using an external broker
MQTT_TOPIC = "home/sensors/temperature"
client = mqtt.Client()
def connect_mqtt():
try:
client.connect(MQTT_BROKER, 1883, 60)
print("Connected to MQTT Broker")
except Exception as e:
print(f"Failed to connect to MQTT Broker: {e}")
return False
return True
class DHT22:
def __init__(self, pi, gpio):
self.pi = pi
self.gpio = gpio
self.high_tick = 0
self.bit = 40
self.temperature = 0.0
self.humidity = 0.0
self.either_edge_cb = None
self.setup()
def setup(self):
self.pi.set_mode(self.gpio, pigpio.INPUT)
self.pi.set_pull_up_down(self.gpio, pigpio.PUD_OFF)
self.either_edge_cb = self.pi.callback(self.gpio, pigpio.EITHER_EDGE, self.edge_callback)
def edge_callback(self, gpio, level, tick):
if level == 0:
if self.high_tick != 0:
t = pigpio.tickDiff(self.high_tick, tick)
if 50 <= t <= 80:
self.bit = 40
self.temperature = 0.0
self.humidity = 0.0
elif t < 180:
self.bit -= 1
if self.bit >= 32:
self.humidity = self.humidity << 1
if t > 110:
self.humidity += 1
elif 16 <= self.bit < 32:
self.temperature = self.temperature << 1
if t > 110:
self.temperature += 1
if self.bit == 0:
self.pi.set_watchdog(self.gpio, 0)
else:
self.high_tick = tick
def read(self):
self.pi.write(self.gpio, pigpio.LOW)
time.sleep(0.017) # 17 ms
self.pi.set_mode(self.gpio, pigpio.INPUT)
self.pi.set_watchdog(self.gpio, 200)
time.sleep(0.2)
if self.bit != 40:
return None
return self.temperature, self.humidity
def publish_sensor_data(dht22):
try:
result = dht22.read()
if result is not None:
temperature, humidity = result
sensor_data = f"Temperature: {temperature:.1f}C Humidity: {humidity:.1f}%"
print(sensor_data)
# Publish the sensor data to the MQTT broker
client.publish(MQTT_TOPIC, sensor_data)
else:
print("Failed to retrieve data from sensor")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
if not connect_mqtt():
exit(1)
pi = pigpio.pi()
if not pi.connected:
print("Failed to connect to pigpio daemon")
exit(1)
dht22 = DHT22(pi, DHT_PIN)
try:
while True:
publish_sensor_data(dht22)
time.sleep(10) # Delay between sensor reads
except KeyboardInterrupt:
print("Stopping the script")
finally:
dht22.either_edge_cb.cancel()
pi.stop()
client.disconnect()Editor is loading...
Leave a Comment