Untitled
unknown
plain_text
a year ago
3.1 kB
14
Indexable
import paho.mqtt.client as mqtt
import random
import time
from datetime import datetime
# MQTT settings
broker_address = "192.168.1.102"
topic = "CPU4"
client_id = "RaspiLoRa"1
port = 1883 # Default MQTT port
# Callback function when the client connects to the broker
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected successfully to broker")
else:
print(f"Connection failed with code {rc}")
# Callback function when a message is published
def on_publish(client, userdata, mid):
print("Message published")
# Function to generate the payload
import random
from datetime import datetime
def generate_series(n=16, variation_limit=0.02, above_limit_factor=0.05):
series = [random.uniform(1.0, 100.0)] # Starting point, random double precision number between 1 and 100
for i in range(1, n):
prev_num = series[i - 1]
# Randomly decide if the change should be within or above 2%
if random.choice([True, False]):
# Within 2% variation
variation = prev_num * random.uniform(-variation_limit, variation_limit)
else:
# Above 2% variation, up to a specified factor (e.g., 5%)
variation = prev_num * random.uniform(-above_limit_factor, above_limit_factor)
new_num = prev_num + variation
series.append(new_num)
return series
def generate_payload():
# Random integer between 1 and 4
adc_number = random.randint(1, 4)
# 16 random float values generated by generate_series with two decimal precision
float_values = [f"{num:.2f}" for num in generate_series()]
# 10 random binary values (0 or 1)
binary_values = [str(random.randint(0, 1)) for _ in range(10)]
# Current date in YYYY:MM:DD format
current_date = datetime.now().strftime("%Y:%m:%d")
# Current timestamp in HH:MM:SS format
current_time = datetime.now().strftime("%H:%M:%S")
# Constructing the message without '#' between float and binary values
message = f"CPU4#ADC{adc_number}#{','.join(float_values)},{','.join(binary_values)}#{current_date}#{current_time}"
return message
# Create a new MQTT client instance
client = mqtt.Client(client_id)
# Assign the callback functions
client.on_connect = on_connect
client.on_publish = on_publish
# Connect to the MQTT broker
client.connect(broker_address, port)
# Start the loop
client.loop_start()
try:
# Send messages periodically
while True:
message = generate_payload()
result = client.publish(topic, message)
# Wait for the message to be sent and processed
result.wait_for_publish()
print(f"Message sent: {message}")
# Wait for 5 seconds before sending the next message
time.sleep(5)
except KeyboardInterrupt:
print("Terminating program...")
# Stop the loop and disconnect from the broker
client.loop_stop()
client.disconnect()Editor is loading...
Leave a Comment