Untitled
unknown
python
a year ago
4.5 kB
102
Indexable
Never
import time import math import uwebsockets.client from uwebsockets.protocol import ConnectionClosed from driver import MotorDriver import network import usocket as socket from config import laravel_access_hash, access_hash, ssid, password, ws_protocol, ws_host, ws_port, MOTOR_SERVICE_UUID, MOTOR_CHAR_UUID, MOTOR_IDLE, MOTOR_START, MOTOR_STOP, _ADV_INTERVAL_US import aioble import bluetooth import uasyncio as asyncio import _thread from threadsafe import Message import machine import utime import gc m = MotorDriver() #Websocket Received Data resp = None #Websocket Data to Send Back To Laravel message_to_send = None async def unblock(func, *args, **kwargs): def wrap(func, message, args, kwargs): message.set(func(*args, **kwargs)) # Run the blocking function. msg = Message() _thread.start_new_thread(wrap, (func, msg, args, kwargs)) return await msg # Define an async function to advertise the service and wait for connection async def advertise_and_wait(): while True: try: # Start advertising with the service UUID, name, and appearance connection = await aioble.advertise( _ADV_INTERVAL_US, name="Aquacrom Device", services=[MOTOR_SERVICE_UUID], appearance=0x0200, ) await connection.disconnected() except: pass finally: aioble.stop() await asyncio.sleep(1) def motor_start(): m.MotorRun('MD', 'forward', 50) def motor_stop(): m.MotorStop('MD') async def checkMotorCharValue(motor_char): while True: print(gc.mem_free()) read_value = motor_char.read() if read_value == MOTOR_START.encode('utf-8'): motor_start() motor_char.write(MOTOR_IDLE) elif read_value == MOTOR_STOP.encode('utf-8'): motor_stop() motor_char.write(MOTOR_IDLE) else: pass await asyncio.sleep(3) def websocket_receive(websocket): global resp global message_to_send try: resp = websocket.recv() if resp == "START_MOTOR": message_to_send = 'send-message-to=' + laravel_access_hash + '&message=OK' #websocket.close() motor_start() if resp == "STOP_MOTOR": message_to_send = 'send-message-to=' + laravel_access_hash + '&message=OK' #websocket.close() motor_stop() except ConnectionClosed: resp = None except KeyboardInterrupt: resp = None except OSError as e: resp = None finally: websocket.close() return def wifi_connect(): # connect to the internet sta_if = network.WLAN(network.STA_IF) count = 0 if not sta_if.isconnected(): sta_if.active(True) sta_if.connect(ssid, password) while (count < 10): count += 1 status = sta_if.status() if (sta_if.isconnected()): count = 0 break print('.', end = '') utime.sleep(1) if (count == 10): count = 0 machine.reset() async def websocket_loop(): global message_to_send wifi_connect() websocket = None while True: try: websocket = uwebsockets.client.connect(ws_protocol + "://" + ws_host + ":" + str(ws_port) + "/access-hash=" + access_hash) if message_to_send is not None: websocket.send(message_to_send) message_to_send = None await unblock(websocket_receive, websocket) except: websocket.close() continue websocket.close() async def main(): print(gc.mem_free()) # Create a service and a characteristic for the motor control motor_service = aioble.Service(MOTOR_SERVICE_UUID) motor_char = aioble.Characteristic(motor_service, MOTOR_CHAR_UUID, read=True, write=True, initial=MOTOR_IDLE) # Register the service with aioble aioble.register_services(motor_service) tasks = [ asyncio.create_task(checkMotorCharValue(motor_char)), asyncio.create_task(advertise_and_wait()), asyncio.create_task(websocket_loop()), ] await asyncio.gather(*tasks) asyncio.run(main())