Untitled
unknown
plain_text
2 years ago
3.5 kB
7
Indexable
import paho.mqtt.client as mqtt
import json
from datetime import datetime
import time
from matplotlib import pyplot as plt
# Параметры подключения к MQTT-брокеру
HOST = "192.168.2.23" # IP чемодана
PORT = 1883 # Стандартный порт подключения для Mosquitto
KEEPALIVE = 60 # Время ожидания доставки сообщения, если при отправке оно будет превышено, брокер будет считаться недоступным
# Словарь с топиками и собираемыми из них параметрами
SUB_TOPICS = {
'/devices/wb-m1w2_14/controls/External Sensor 1': 'temperature',
'/devices/wb-msw-v3_21/controls/Sound Level': 'sound',
'/devices/wb-msw-v3_21/controls/CO2': 'CO2',
'/devices/wb-msw-v3_21/controls/Air Quality (VOC)': 'air quality',
'/devices/wb-msw-v3_21/': 'power'
}
JSON_LIST = []
# Создание словарей для хранения данных JSON и XML
JSON_DICT = {}
for value in SUB_TOPICS.values():
JSON_DICT[value] = 0
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
for topic in SUB_TOPICS.keys():
client.subscribe(topic)
co2_data = []
sound_data = []
power_data = []
time_data = []
# Create subplots for CO2, sound, and power
fig, (co2_ax, sound_ax, power_ax) = plt.subplots(3, 1, figsize=(10, 8))
fig.suptitle("Sensor Data")
def on_message(client, userdata, msg):
payload = msg.payload.decode()
topic = msg.topic
param_name = SUB_TOPICS[topic]
JSON_DICT[param_name] = payload
JSON_DICT['time'] = str(datetime.now())
# Append data to respective lists for plotting
if param_name == 'CO2':
co2_data.append(float(payload))
elif param_name == 'sound':
sound_data.append(float(payload))
elif param_name == 'power':
power_data.append(float(payload))
time_data.append(datetime.now())
# Update the plots
co2_ax.clear()
co2_ax.plot(time_data, co2_data, label='CO2')
co2_ax.set_title('CO2')
co2_ax.set_xlabel('Time')
co2_ax.set_ylabel('Value')
sound_ax.clear()
sound_ax.plot(time_data, sound_data, label='Sound')
sound_ax.set_title('Sound')
sound_ax.set_xlabel('Time')
sound_ax.set_ylabel('Value')
power_ax.clear()
power_ax.plot(time_data, power_data, label='Power')
power_ax.set_title('Power')
power_ax.set_xlabel('Time')
power_ax.set_ylabel('Value')
fig.tight_layout()
fig.canvas.flush_events()
# Update the JSON file
JSON_LIST.append(JSON_DICT)
with open('data.json', 'w') as json_file:
json_string = json.dumps(JSON_LIST, indent=4)
json_file.write(json_string)
def read_json_data(file_path):
try:
with open(file_path, 'r') as json_file:
data = json.load(json_file) # Загружаем JSON-данные
print("JSON Data:")
print(data)
except FileNotFoundError:
print(f"File {file_path} not found")
def main():
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, PORT, KEEPALIVE)
client.loop_start()
while True:
read_json_data('data.json')
time.sleep(5)
if __name__ == "__main__":
main()
Editor is loading...