Untitled

 avatar
unknown
plain_text
a year ago
5.7 kB
0
Indexable
import time
from threading import Event
import concurrent
from concurrent.futures import ThreadPoolExecutor
from random import randint
from datetime import datetime, timedelta

import pytest

from Utils.KrakenOrderHelper.express_whs import Warehouse_49113


@pytest.fixture(scope="session")
def random_client_user_id():
    yield randint(1000000, 9999999)


def update_posting_timeslot(db, posting_number: str, is_express=0):
    if is_express:
        new_timeslot = (datetime.utcnow() + timedelta(hours=6.5)).strftime("%Y-%m-%d %H:00:00.") + '000'
    else:
        new_timeslot = (datetime.utcnow() + timedelta(hours=7)).strftime("%Y-%m-%d %H:00:00.") + '000'
    print(posting_number)
    db.rezon.update_posting_timeslot_by_posting_number(posting_number, new_timeslot)


@pytest.fixture()
def get_ts_for_pvz(api):
    def get_ts_for_pvz(clearing_id: int):
        dv = api.http.lms_qa_admin.get_delivery_variants(page_number=1, page_size=500, sc=clearing_id,
                                                         delivery_variant_type_id=52895552000)["data"][
            0]["id"]  # самовывоз
        timeslot = api.http.lms_qa_admin.get_delivery_variants_time_slots(dv)
        print(timeslot["timeSlots"][0])
        return timeslot["timeSlots"][0]

    return get_ts_for_pvz


@pytest.fixture()
def create_order(api, db):
    def create_order(wh_rezon_id=None, item_id=None, client_id=None, delivery_type=None, with_bag=None,
                     postings_qty=None, item_qty=None, timeslot_id=None):
        posting_number = api.http.oe_qa_helper.create_oms_order(wh_rezon_id=wh_rezon_id,
                                                                item_id=item_id,
                                                                delivery_type=delivery_type,
                                                                timeslot_id=timeslot_id,
                                                                with_bag=with_bag,
                                                                client_id=client_id,
                                                                item_qty=item_qty,
                                                                postings_qty=postings_qty)
        order = db.rezon.get_posting_state_id(str(posting_number[0]))
        return order

    return create_order


@pytest.fixture()
def get_timeslot_id(api, db):
    def get_timeslot_id(ts_type: str, wh_rezon_id: int):
        return api.http.oe_qa_helper.get_timeslot_for_wh(wh_rezon_id=wh_rezon_id, ts_type=ts_type)

    return get_timeslot_id


def wait_for_batching_task(db, posting_id, timeout=500, interval=0.003):  # Интервал в секундах
    start_time = time.time()
    print('жду')

    with ThreadPoolExecutor(max_workers=10) as executor:
        event = Event()
        while time.time() - start_time < timeout:
            res = executor.submit(db.rezon.get_batching_task_by_posting_id, posting_id).result()
            if res and len(res) > 0:
                return res
            event.wait(timeout=interval)  # Используем event.wait вместо sleep для кратковременного ожидания
    return None


def get_timeslots(wh, get_timeslot_id, get_ts_for_pvz):
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        future_day_to_day_timeslot = executor.submit(get_timeslot_id, ts_type="regular", wh_rezon_id=wh.rezon_id)
        future_express_timeslot = executor.submit(get_timeslot_id, ts_type="express", wh_rezon_id=wh.rezon_id)
        future_pvz_timeslot = executor.submit(get_ts_for_pvz, wh.clearing_id)

        day_to_day_timeslot = future_day_to_day_timeslot.result()
        express_timeslot = future_express_timeslot.result()
        pvz_timeslot = future_pvz_timeslot.result()

        return day_to_day_timeslot, express_timeslot, pvz_timeslot


class TestRegressBatching:
    def test_positive_path(api, db, create_order, random_client_user_id, get_timeslot_id, get_ts_for_pvz):
        wh = Warehouse_49113
        day_to_day_timeslot, express_timeslot, pvz_timeslot = get_timeslots(wh, get_timeslot_id, get_ts_for_pvz)
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            future_orders = [
                executor.submit(lambda: create_order(
                    wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_pvz",
                    client_id=random_client_user_id, with_bag=False,
                    timeslot_id=pvz_timeslot['id'], item_qty=1)),
                executor.submit(lambda: create_order(
                    wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_day_to_day",
                    client_id=random_client_user_id + 1, with_bag=False,
                    timeslot_id=day_to_day_timeslot['id'], item_qty=1)),
                executor.submit(lambda: create_order(
                    wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_express",
                    client_id=random_client_user_id + 2, with_bag=True,
                    timeslot_id=express_timeslot['id'], item_qty=1)),
                executor.submit(lambda: create_order(
                    wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_countrywide",
                    client_id=random_client_user_id + 3, with_bag=False,
                    timeslot_id=pvz_timeslot['id'], item_qty=1))
            ]

            orders_results = [future.result() for future in future_orders]
            for order_result in orders_results:
                print(order_result)
                for posting in order_result:
                    res = wait_for_batching_task(db, posting["ID"])
                    assert res
Leave a Comment