a year ago
4.5 kB
import time import math import uwebsockets.client from uwebsockets.protocol import ConnectionClosed from driver import MotorDriver import network import usocket as socket from config import laravel_access_hash, access_hash, ssid, password, ws_protocol, ws_host, ws_port, MOTOR_SERVICE_UUID, MOTOR_CHAR_UUID, MOTOR_IDLE, MOTOR_START, MOTOR_STOP, _ADV_INTERVAL_US import aioble import bluetooth import uasyncio as asyncio import _thread from threadsafe import Message import machine import utime import gc m = MotorDriver() #Websocket Received Data resp = None #Websocket Data to Send Back To Laravel message_to_send = None async def unblock(func, *args, **kwargs): def wrap(func, message, args, kwargs): message.set(func(*args, **kwargs)) # Run the blocking function. msg = Message() _thread.start_new_thread(wrap, (func, msg, args, kwargs)) return await msg # Define an async function to advertise the service and wait for connection async def advertise_and_wait(): while True: try: # Start advertising with the service UUID, name, and appearance connection = await aioble.advertise( _ADV_INTERVAL_US, name="Aquacrom Device", services=[MOTOR_SERVICE_UUID], appearance=0x0200, ) await connection.disconnected() except: pass finally: aioble.stop() await asyncio.sleep(1) def motor_start(): m.MotorRun('MD', 'forward', 50) def motor_stop(): m.MotorStop('MD') async def checkMotorCharValue(motor_char): while True: print(gc.mem_free()) read_value = motor_char.read() if read_value == MOTOR_START.encode('utf-8'): motor_start() motor_char.write(MOTOR_IDLE) elif read_value == MOTOR_STOP.encode('utf-8'): motor_stop() motor_char.write(MOTOR_IDLE) else: pass await asyncio.sleep(3) def websocket_receive(websocket): global resp global message_to_send try: resp = websocket.recv() if resp == "START_MOTOR": message_to_send = 'send-message-to=' + laravel_access_hash + '&message=OK' #websocket.close() motor_start() if resp == "STOP_MOTOR": message_to_send = 'send-message-to=' + laravel_access_hash + '&message=OK' #websocket.close() motor_stop() except ConnectionClosed: resp = None except KeyboardInterrupt: resp = None except OSError as e: resp = None finally: websocket.close() return def wifi_connect(): # connect to the internet sta_if = network.WLAN(network.STA_IF) count = 0 if not sta_if.isconnected(): sta_if.active(True) sta_if.connect(ssid, password) while (count < 10): count += 1 status = sta_if.status() if (sta_if.isconnected()): count = 0 break print('.', end = '') utime.sleep(1) if (count == 10): count = 0 machine.reset() async def websocket_loop(): global message_to_send wifi_connect() websocket = None while True: try: websocket = uwebsockets.client.connect(ws_protocol + "://" + ws_host + ":" + str(ws_port) + "/access-hash=" + access_hash) if message_to_send is not None: websocket.send(message_to_send) message_to_send = None await unblock(websocket_receive, websocket) except: websocket.close() continue websocket.close() async def main(): print(gc.mem_free()) # Create a service and a characteristic for the motor control motor_service = aioble.Service(MOTOR_SERVICE_UUID) motor_char = aioble.Characteristic(motor_service, MOTOR_CHAR_UUID, read=True, write=True, initial=MOTOR_IDLE) # Register the service with aioble aioble.register_services(motor_service) tasks = [ asyncio.create_task(checkMotorCharValue(motor_char)), asyncio.create_task(advertise_and_wait()), asyncio.create_task(websocket_loop()), ] await asyncio.gather(*tasks) asyncio.run(main())