Untitled

mail@pastecode.io avatar
unknown
python
a year ago
4.5 kB
104
Indexable
import time
import math
import uwebsockets.client
from uwebsockets.protocol import ConnectionClosed
from driver import MotorDriver
import network
import usocket as socket
from config import laravel_access_hash, access_hash, ssid, password, ws_protocol, ws_host, ws_port, MOTOR_SERVICE_UUID, MOTOR_CHAR_UUID, MOTOR_IDLE, MOTOR_START, MOTOR_STOP, _ADV_INTERVAL_US
import aioble
import bluetooth
import uasyncio as asyncio
import _thread
from threadsafe import Message
import machine
import utime
import gc

m = MotorDriver()

#Websocket Received Data
resp = None

#Websocket Data to Send Back To Laravel
message_to_send = None

async def unblock(func, *args, **kwargs):
    def wrap(func, message, args, kwargs):
        message.set(func(*args, **kwargs))  # Run the blocking function.
    msg = Message()
    _thread.start_new_thread(wrap, (func, msg, args, kwargs))
    return await msg

# Define an async function to advertise the service and wait for connection
async def advertise_and_wait():
    while True:
        try:
            # Start advertising with the service UUID, name, and appearance
            connection = await aioble.advertise(
                _ADV_INTERVAL_US,
                name="Aquacrom Device",
                services=[MOTOR_SERVICE_UUID],
                appearance=0x0200,
            )
            await connection.disconnected()
        except:
            pass
        finally:
            aioble.stop()
            await asyncio.sleep(1)

def motor_start():
    m.MotorRun('MD', 'forward', 50)
    
def motor_stop():
    m.MotorStop('MD')

async def checkMotorCharValue(motor_char):
    while True:
        print(gc.mem_free())
        read_value = motor_char.read()
        if read_value == MOTOR_START.encode('utf-8'):
            motor_start()
            motor_char.write(MOTOR_IDLE)
        elif read_value == MOTOR_STOP.encode('utf-8'):
            motor_stop()
            motor_char.write(MOTOR_IDLE)
        else:
            pass
        await asyncio.sleep(3)

def websocket_receive(websocket):
    global resp
    global message_to_send
    try:
        resp = websocket.recv()
        
        if resp == "START_MOTOR":
            message_to_send = 'send-message-to=' + laravel_access_hash + '&message=OK'
            #websocket.close()
            motor_start()

        if resp == "STOP_MOTOR":
            message_to_send = 'send-message-to=' + laravel_access_hash + '&message=OK'
            #websocket.close()
            motor_stop()
            
    except ConnectionClosed:
        resp = None
    except KeyboardInterrupt:
        resp = None
    except OSError as e:
        resp = None
    finally:
        websocket.close()
        return
        
def wifi_connect():
 #  connect to the internet
    sta_if = network.WLAN(network.STA_IF)
    count = 0

    if not sta_if.isconnected():
        sta_if.active(True)
        sta_if.connect(ssid, password)

        while (count < 10):
            count += 1
            status = sta_if.status()
            if (sta_if.isconnected()):
                count = 0
                break

            print('.', end = '')
            utime.sleep(1)

    if (count == 10):
        count = 0
        machine.reset()

async def websocket_loop():
    global message_to_send
    
    wifi_connect()

    websocket = None

    while True:
        try:
            websocket = uwebsockets.client.connect(ws_protocol + "://" + ws_host + ":" + str(ws_port) + "/access-hash=" + access_hash)
            if message_to_send is not None:
                websocket.send(message_to_send)
                message_to_send = None
            await unblock(websocket_receive, websocket)
        except:
            websocket.close()
            continue

    websocket.close()

async def main():
    print(gc.mem_free())
    
    # Create a service and a characteristic for the motor control
    motor_service = aioble.Service(MOTOR_SERVICE_UUID)
    motor_char = aioble.Characteristic(motor_service, MOTOR_CHAR_UUID, read=True, write=True, initial=MOTOR_IDLE)

    # Register the service with aioble
    aioble.register_services(motor_service)
    
    tasks = [
        asyncio.create_task(checkMotorCharValue(motor_char)),
        asyncio.create_task(advertise_and_wait()),
        asyncio.create_task(websocket_loop()),
    ]
    await asyncio.gather(*tasks)
    
asyncio.run(main())