Untitled
python
2 months ago
2.4 kB
0
Indexable
Never
import random import psycopg2 import json from paho.mqtt import client as mqtt_client broker = "10.227.211.142" port = 1883 machine_name = 'spot_weld_0' topic = "{0}/sensor_data".format(machine_name) # Generate a Client ID with the subscribe prefix. client_id = f'subscribe-{random.randint(0, 100)}' def connect_db(): try: con = psycopg2.connect(database="db", user="postgres", password="postgres", host="10.227.211.142", port="5432") return con except: print("An error occurred trying to connect to the database") def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def subscribe(con, client: mqtt_client): def on_message(client, userdata, msg): try: print(f"Received message from `{msg.topic}` topic") # Variable type of message is string, meaning that needs handling. received_data = msg.payload.decode() # Converts the str to dict. values_dict = json.loads(received_data) values_list = list(values_dict.values()) cursor_obj = con.cursor() insert_statement = ( 'INSERT INTO "WELD_SAMPLES" (\ time_start, \ time_end, \ environment_t, \ motor_bearing_t, \ spindle_bearing_t, \ counter, \ sdintensity, \ times, \ angular_velocity, \ force, \ displacement\ ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)' ) cursor_obj.execute(insert_statement, values_list) con.commit() cursor_obj.close() except (Exception, psycopg2.DatabaseError) as error: print(error) client.subscribe(topic) client.on_message = on_message def run(): con = connect_db() client = connect_mqtt() subscribe(con, client) client.loop_forever() if __name__ == '__main__': run()