Untitled

mail@pastecode.io avatar
unknown
python
a year ago
4.5 kB
102
Indexable
Never
import time
import math
import uwebsockets.client
from uwebsockets.protocol import ConnectionClosed
from driver import MotorDriver
import network
import usocket as socket
from config import laravel_access_hash, access_hash, ssid, password, ws_protocol, ws_host, ws_port, MOTOR_SERVICE_UUID, MOTOR_CHAR_UUID, MOTOR_IDLE, MOTOR_START, MOTOR_STOP, _ADV_INTERVAL_US
import aioble
import bluetooth
import uasyncio as asyncio
import _thread
from threadsafe import Message
import machine
import utime
import gc

m = MotorDriver()

#Websocket Received Data
resp = None

#Websocket Data to Send Back To Laravel
message_to_send = None

async def unblock(func, *args, **kwargs):
    def wrap(func, message, args, kwargs):
        message.set(func(*args, **kwargs))  # Run the blocking function.
    msg = Message()
    _thread.start_new_thread(wrap, (func, msg, args, kwargs))
    return await msg

# Define an async function to advertise the service and wait for connection
async def advertise_and_wait():
    while True:
        try:
            # Start advertising with the service UUID, name, and appearance
            connection = await aioble.advertise(
                _ADV_INTERVAL_US,
                name="Aquacrom Device",
                services=[MOTOR_SERVICE_UUID],
                appearance=0x0200,
            )
            await connection.disconnected()
        except:
            pass
        finally:
            aioble.stop()
            await asyncio.sleep(1)

def motor_start():
    m.MotorRun('MD', 'forward', 50)
    
def motor_stop():
    m.MotorStop('MD')

async def checkMotorCharValue(motor_char):
    while True:
        print(gc.mem_free())
        read_value = motor_char.read()
        if read_value == MOTOR_START.encode('utf-8'):
            motor_start()
            motor_char.write(MOTOR_IDLE)
        elif read_value == MOTOR_STOP.encode('utf-8'):
            motor_stop()
            motor_char.write(MOTOR_IDLE)
        else:
            pass
        await asyncio.sleep(3)

def websocket_receive(websocket):
    global resp
    global message_to_send
    try:
        resp = websocket.recv()
        
        if resp == "START_MOTOR":
            message_to_send = 'send-message-to=' + laravel_access_hash + '&message=OK'
            #websocket.close()
            motor_start()

        if resp == "STOP_MOTOR":
            message_to_send = 'send-message-to=' + laravel_access_hash + '&message=OK'
            #websocket.close()
            motor_stop()
            
    except ConnectionClosed:
        resp = None
    except KeyboardInterrupt:
        resp = None
    except OSError as e:
        resp = None
    finally:
        websocket.close()
        return
        
def wifi_connect():
 #  connect to the internet
    sta_if = network.WLAN(network.STA_IF)
    count = 0

    if not sta_if.isconnected():
        sta_if.active(True)
        sta_if.connect(ssid, password)

        while (count < 10):
            count += 1
            status = sta_if.status()
            if (sta_if.isconnected()):
                count = 0
                break

            print('.', end = '')
            utime.sleep(1)

    if (count == 10):
        count = 0
        machine.reset()

async def websocket_loop():
    global message_to_send
    
    wifi_connect()

    websocket = None

    while True:
        try:
            websocket = uwebsockets.client.connect(ws_protocol + "://" + ws_host + ":" + str(ws_port) + "/access-hash=" + access_hash)
            if message_to_send is not None:
                websocket.send(message_to_send)
                message_to_send = None
            await unblock(websocket_receive, websocket)
        except:
            websocket.close()
            continue

    websocket.close()

async def main():
    print(gc.mem_free())
    
    # Create a service and a characteristic for the motor control
    motor_service = aioble.Service(MOTOR_SERVICE_UUID)
    motor_char = aioble.Characteristic(motor_service, MOTOR_CHAR_UUID, read=True, write=True, initial=MOTOR_IDLE)

    # Register the service with aioble
    aioble.register_services(motor_service)
    
    tasks = [
        asyncio.create_task(checkMotorCharValue(motor_char)),
        asyncio.create_task(advertise_and_wait()),
        asyncio.create_task(websocket_loop()),
    ]
    await asyncio.gather(*tasks)
    
asyncio.run(main())