Untitled

mail@pastecode.io avatar
unknown
plain_text
18 days ago
5.1 kB
1
Indexable
Never
import asyncio
from concurrent.futures import ThreadPoolExecutor
from random import randint
from datetime import datetime, timedelta

import pytest

from Utils.KrakenOrderHelper.express_whs import Warehouse_49113
from tests.batching.test_mono import update_posting_timeslot


@pytest.fixture(scope="session")
def random_client_user_id():
    yield randint(1000000, 9999999)


def update_posting_timeslot(db, posting_number: str, is_express=0):
    if is_express:
        new_timeslot = (datetime.utcnow() + timedelta(hours=6.5)).strftime("%Y-%m-%d %H:00:00.") + '000'
    else:
        new_timeslot = (datetime.utcnow() + timedelta(hours=7)).strftime("%Y-%m-%d %H:00:00.") + '000'
    print(posting_number)
    db.rezon.update_posting_timeslot_by_posting_number(posting_number, new_timeslot)


@pytest.fixture()
def get_ts_for_pvz(api):
    async def get_ts_for_pvz(clearing_id: int):
        return api.http.lms_qa_admin.get_delivery_variants(page_number=1, page_size=500, sc=clearing_id,
                                                           delivery_variant_type_id=52895552000)["data"][0]  # самовывоз

    return get_ts_for_pvz


@pytest.fixture()
def create_order(api, db):
    async def create_order(wh_rezon_id=None, item_id=None, client_id=None, delivery_type=None, with_bag=None,
                           postings_qty=None, item_qty=None, timeslot_id=None):
        posting_number = api.http.oe_qa_helper.create_oms_order(wh_rezon_id=wh_rezon_id,
                                                                item_id=item_id,
                                                                delivery_type=delivery_type,
                                                                timeslot_id=timeslot_id,
                                                                with_bag=with_bag,
                                                                client_id=client_id,
                                                                item_qty=item_qty,
                                                                postings_qty=postings_qty)[0]
        order = db.rezon.get_posting_state_id(str(posting_number))
        print(order)
        is_express = 1 if delivery_type == "mono_express" else 0
        update_posting_timeslot(db, order[0]["PostingNumber"], is_express)
        return order
    return create_order


@pytest.fixture()
def get_timeslot_id(api, db):
    async def get_timeslot_id(ts_type: str, wh_rezon_id: int):
        return api.http.oe_qa_helper.get_timeslot_for_wh(wh_rezon_id=wh_rezon_id, ts_type=ts_type)

    return get_timeslot_id


async def wait_for_batching_task(db, posting_id, timeout=500, interval=3):
    executor = ThreadPoolExecutor(max_workers=10)
    loop = asyncio.get_event_loop()
    start_time = loop.time()

    print('жду')
    try:
        while loop.time() - start_time < timeout:
            res = await loop.run_in_executor(executor, db.rezon.get_batching_task_by_posting_id, posting_id)
            print(res)
            if res and len(res) > 0:
                return res
            await asyncio.sleep(interval)
        return None
    finally:
        executor.shutdown(wait=True)


async def get_timeslots(wh, get_timeslot_id, get_ts_for_pvz):
    return await asyncio.gather(
        get_timeslot_id(ts_type="regular", wh_rezon_id=wh.rezon_id),
        get_timeslot_id(ts_type="express", wh_rezon_id=wh.rezon_id),
        get_ts_for_pvz(wh.clearing_id)
    )


class TestRegressBatching:
    @pytest.mark.asyncio
    async def test_positive_path(api, db, create_order, random_client_user_id, get_timeslot_id, get_ts_for_pvz):
        wh = Warehouse_49113

        day_to_day_timeslot, express_timeslot, pvz_timeslot = await get_timeslots(wh, get_timeslot_id, get_ts_for_pvz)

        # order1, order2, order3
        order = await asyncio.gather(
            create_order(wh_rezon_id=wh.rezon_id, item_id=163275485,
                         delivery_type="mono_express", client_id=random_client_user_id, with_bag=True,
                         timeslot_id=day_to_day_timeslot["id"]),
            create_order(wh_rezon_id=wh.rezon_id, item_id=163275485,
                         delivery_type="mono_day_to_day", client_id=random_client_user_id + 1, with_bag=True,
                         timeslot_id=express_timeslot["id"]),
            create_order(wh_rezon_id=wh.rezon_id, item_id=163275485,
                         delivery_type="mono_pvz", client_id=random_client_user_id + 2, with_bag=False,
                         timeslot_id=pvz_timeslot["id"]),
            create_order(wh_rezon_id=wh.rezon_id, item_id=163275485,
                         delivery_type="mono_countrywide", client_id=random_client_user_id + 3, with_bag=False,
                         timeslot_id=pvz_timeslot["id"]
                         ),
        )
        print(order)
        # print(order1)
        # print(order2)
        # print(order3)
        # for posting in order + order1 + order2:
        #     res = await wait_for_batching_task(db, posting["ID"])
        #     assert res
Leave a Comment