Untitled

 avatar
unknown
plain_text
5 months ago
3.2 kB
4
Indexable
# Import necessary libraries for PubNub and GPIO control
from pubnub.pubnub import PubNub, SubscribeListener
from pubnub.pnconfiguration import PNConfiguration
import RPi.GPIO as GPIO
import time

# Configure PubNub settings
pubnub_config = PNConfiguration()
pubnub_config.publish_key = 'pub-c-5ebab5ae-86fe-439d-83cd-77adee39c5c6'  # Set PubNub publish key
pubnub_config.subscribe_key = 'sub-c-a7928bd8-ed5f-47c3-96c6-60301152332a'  # Set PubNub subscribe key
pubnub_config.uuid = 'zhengyang'  # Unique identifier for this client
pubnub_instance = PubNub(pubnub_config)

# GPIO pin setup
LED_GPIO_PIN = 6  # GPIO pin for LED
SERVO_GPIO_PIN = 12  # GPIO pin for Servo motor
GPIO.setmode(GPIO.BCM)  # Set GPIO mode to BCM
GPIO.setup(LED_GPIO_PIN, GPIO.OUT)  # Configure the LED pin as output
GPIO.setup(SERVO_GPIO_PIN, GPIO.OUT)  # Configure the Servo pin as output

# Initialize PWM for the servo motor (50Hz frequency)
servo_motor = GPIO.PWM(SERVO_GPIO_PIN, 50)
servo_motor.start(0)  # Start with 0 duty cycle (servo off)

# Set the PubNub communication channel
pubnub_channel = 'cui'

# Create a listener to receive messages from PubNub
pubnub_listener = SubscribeListener()
pubnub_instance.add_listener(pubnub_listener)  # Attach listener to PubNub
pubnub_instance.subscribe().channels(pubnub_channel).execute()  # Subscribe to the specified channel

# Wait for the listener to connect
pubnub_listener.wait_for_connect()
print('Successfully connected to PubNub.')

# Send a message when the script is initialized
startup_message = {'myMessage': 'Script initialized'}
pubnub_instance.publish().channel(pubnub_channel).message(startup_message).sync()
print("Initialization message sent to the PubNub channel.")

# Function to control the LED
def control_led(state):
    if state == 'ON':
        GPIO.output(LED_GPIO_PIN, GPIO.HIGH)  # Turn LED on
        print("LED is ON.")
    elif state == 'OFF':
        GPIO.output(LED_GPIO_PIN, GPIO.LOW)  # Turn LED off
        print("LED is OFF.")

# Function to control the servo motor
def control_servo(state):
    if state == 'ON':
        servo_motor.ChangeDutyCycle(7.5)  # Move servo to 90 degrees (adjust as needed)
        print("Servo motor activated (90 degrees).")
    elif state == 'OFF':
        servo_motor.ChangeDutyCycle(2.5)  # Turn servo off
        print("Servo motor deactivated.")

# Infinite loop to handle incoming PubNub messages and control devices
try:
    while True:
        received_message = pubnub_listener.wait_for_message_on(pubnub_channel)  # Wait for new messages
        message_content = received_message.message.get('myMessage')
        print(f"New message received: {message_content}")

        # Control devices based on received messages
        if message_content == 'LED ON':
            control_led('ON')
        elif message_content == 'LED OFF':
            control_led('OFF')
        elif message_content == 'SERVO ON':
            control_servo('ON')
        elif message_content == 'SERVO OFF':
            control_servo('OFF')

except KeyboardInterrupt:
    pass

finally:
    # Clean up GPIO settings and stop the servo
    servo_motor.stop()
    GPIO.cleanup()
Editor is loading...
Leave a Comment