Untitled
unknown
plain_text
a year ago
1.8 kB
13
Indexable
import paho.mqtt.client as mqtt
import RPi.GPIO as GPIO
import time
# Pin configurations
LED_PIN = 7
SERVO_PIN = 8
# Setup GPIO
GPIO.setmode(GPIO.BOARD)
GPIO.setup(LED_PIN, GPIO.OUT)
# Initialize PWM for Servo
GPIO.setup(SERVO_PIN, GPIO.OUT)
servo = GPIO.PWM(SERVO_PIN, 50) # Set frequency to 50 Hz
servo.start(0) # Initialize servo position
# PubNub variables
PUBNUB_PUBLISH_KEY = 'pub-c-457d49c1-ded9-40fb-9f36-400bafc3563b'
PUBNUB_SUBSCRIBE_KEY = 'sub-c-abae8874-67d0-4eaf-9e80-65adbc98afba'
CHANNEL = 'lab4'
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe(CHANNEL)
def on_message(client, userdata, msg):
message = msg.payload.decode()
print(f"Received message: {message}")
if 'lightOn' in message:
GPIO.output(LED_PIN, GPIO.HIGH) # Turn LED on
elif 'lightOff' in message:
GPIO.output(LED_PIN, GPIO.LOW) # Turn LED off
elif 'servoOpen' in message:
servo.ChangeDutyCycle(2) # Adjust as needed for your servo
time.sleep(1) # Allow time for movement
servo.ChangeDutyCycle(0) # Stop sending signal
elif 'servoClose' in message:
servo.ChangeDutyCycle(12) # Adjust as needed for your servo
time.sleep(1) # Allow time for movement
servo.ChangeDutyCycle(0) # Stop sending signal
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# Connect to PubNub MQTT
client.username_pw_set(PUBNUB_PUBLISH_KEY, '')
client.connect('mqtt.pubnub.com', 1883, 60)
try:
client.loop_forever()
except KeyboardInterrupt:
pass
finally:
GPIO.cleanup() # Clean up GPIO on exit
servo.stop() # Stop the servo PWM
Editor is loading...
Leave a Comment