Untitled

mail@pastecode.io avatar
unknown
plain_text
7 months ago
2.1 kB
1
Indexable
Never
from flask import Flask, render_template, jsonify
import paho.mqtt.client as mqtt
import ssl
import json
from flasgger import Swagger
from flask_swagger_ui import get_swaggerui_blueprint

app = Flask(__name__)

client_username = "Foxtrot"
client_password = "kEkTyCr1mI6A2s3O"
client_url = "myggen.mooo.com"
client_port = 8883

data = []


SWAGGER_URL = "/api/docs"
API_URL = "/static/swagger.json"

swaggerui_blueprint = get_swaggerui_blueprint(
    SWAGGER_URL, API_URL, config={"app_name": "P2P"}
)

app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL)


@app.route("/", methods=["GET"])
def index():
    return render_template("index.html")


@app.route("/messages", methods=["GET"])
def messages():
    """
    This is your endpoint description.
    ---
    responses:
      200:
        description: A successful response
    """
    # Your endpoint logic here
    eco = data["eco"]
    ecoMin = data["ecoMin"]
    ecoMax = data["ecoMax"]
    tvoc = data["tvoc"]
    tvocMin = data["tvocMin"]
    tvocMax = data["tvocMax"]
    return jsonify(
        {
            "eco": eco,
            "ecoMin": ecoMin,
            "ecoMax": ecoMax,
            "tvoc": tvoc,
            "tvocMin": tvocMin,
            "tvocMax": tvocMax,
        }
    )


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected with result code " + str(rc))
        client.subscribe("Foxtrot/#")
    else:
        print("Connection Failed!")


def on_message(client, userdata, msg):
    message = msg.payload.decode("utf-8")
    global data
    data = json.loads(message)
    print("Received Message: " + message)


client = mqtt.Client()
client.username_pw_set(client_username, client_password)
client.tls_set(tls_version=ssl.PROTOCOL_TLSv1_2)
client.on_connect = on_connect
client.on_message = on_message

try:
    client.connect(client_url, client_port, keepalive=60)
except Exception as e:
    print("Error connecting to MQTT broker:", str(e))
client.loop_start()


if __name__ == "__main__":
    app.run(debug=True)