Untitled
unknown
plain_text
2 years ago
3.5 kB
6
Indexable
import paho.mqtt.client as mqtt import json from datetime import datetime import time from matplotlib import pyplot as plt # Параметры подключения к MQTT-брокеру HOST = "192.168.2.23" # IP чемодана PORT = 1883 # Стандартный порт подключения для Mosquitto KEEPALIVE = 60 # Время ожидания доставки сообщения, если при отправке оно будет превышено, брокер будет считаться недоступным # Словарь с топиками и собираемыми из них параметрами SUB_TOPICS = { '/devices/wb-m1w2_14/controls/External Sensor 1': 'temperature', '/devices/wb-msw-v3_21/controls/Sound Level': 'sound', '/devices/wb-msw-v3_21/controls/CO2': 'CO2', '/devices/wb-msw-v3_21/controls/Air Quality (VOC)': 'air quality', '/devices/wb-msw-v3_21/': 'power' } JSON_LIST = [] # Создание словарей для хранения данных JSON и XML JSON_DICT = {} for value in SUB_TOPICS.values(): JSON_DICT[value] = 0 def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) for topic in SUB_TOPICS.keys(): client.subscribe(topic) co2_data = [] sound_data = [] power_data = [] time_data = [] # Create subplots for CO2, sound, and power fig, (co2_ax, sound_ax, power_ax) = plt.subplots(3, 1, figsize=(10, 8)) fig.suptitle("Sensor Data") def on_message(client, userdata, msg): payload = msg.payload.decode() topic = msg.topic param_name = SUB_TOPICS[topic] JSON_DICT[param_name] = payload JSON_DICT['time'] = str(datetime.now()) # Append data to respective lists for plotting if param_name == 'CO2': co2_data.append(float(payload)) elif param_name == 'sound': sound_data.append(float(payload)) elif param_name == 'power': power_data.append(float(payload)) time_data.append(datetime.now()) # Update the plots co2_ax.clear() co2_ax.plot(time_data, co2_data, label='CO2') co2_ax.set_title('CO2') co2_ax.set_xlabel('Time') co2_ax.set_ylabel('Value') sound_ax.clear() sound_ax.plot(time_data, sound_data, label='Sound') sound_ax.set_title('Sound') sound_ax.set_xlabel('Time') sound_ax.set_ylabel('Value') power_ax.clear() power_ax.plot(time_data, power_data, label='Power') power_ax.set_title('Power') power_ax.set_xlabel('Time') power_ax.set_ylabel('Value') fig.tight_layout() fig.canvas.flush_events() # Update the JSON file JSON_LIST.append(JSON_DICT) with open('data.json', 'w') as json_file: json_string = json.dumps(JSON_LIST, indent=4) json_file.write(json_string) def read_json_data(file_path): try: with open(file_path, 'r') as json_file: data = json.load(json_file) # Загружаем JSON-данные print("JSON Data:") print(data) except FileNotFoundError: print(f"File {file_path} not found") def main(): client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(HOST, PORT, KEEPALIVE) client.loop_start() while True: read_json_data('data.json') time.sleep(5) if __name__ == "__main__": main()
Editor is loading...