Untitled

 avatar
unknown
plain_text
2 years ago
5.7 kB
11
Indexable
import time
from threading import Event
import concurrent
from concurrent.futures import ThreadPoolExecutor
from random import randint
from datetime import datetime, timedelta

import pytest

from Utils.KrakenOrderHelper.express_whs import Warehouse_49113


@pytest.fixture(scope="session")
def random_client_user_id():
    yield randint(1000000, 9999999)


def update_posting_timeslot(db, posting_number: str, is_express=0):
    if is_express:
        new_timeslot = (datetime.utcnow() + timedelta(hours=6.5)).strftime("%Y-%m-%d %H:00:00.") + '000'
    else:
        new_timeslot = (datetime.utcnow() + timedelta(hours=7)).strftime("%Y-%m-%d %H:00:00.") + '000'
    print(posting_number)
    db.rezon.update_posting_timeslot_by_posting_number(posting_number, new_timeslot)


@pytest.fixture()
def get_ts_for_pvz(api):
    def get_ts_for_pvz(clearing_id: int):
        dv = api.http.lms_qa_admin.get_delivery_variants(page_number=1, page_size=500, sc=clearing_id,
                                                         delivery_variant_type_id=52895552000)["data"][
            0]["id"]  # самовывоз
        timeslot = api.http.lms_qa_admin.get_delivery_variants_time_slots(dv)
        print(timeslot["timeSlots"][0])
        return timeslot["timeSlots"][0]

    return get_ts_for_pvz


@pytest.fixture()
def create_order(api, db):
    def create_order(wh_rezon_id=None, item_id=None, client_id=None, delivery_type=None, with_bag=None,
                     postings_qty=None, item_qty=None, timeslot_id=None):
        posting_number = api.http.oe_qa_helper.create_oms_order(wh_rezon_id=wh_rezon_id,
                                                                item_id=item_id,
                                                                delivery_type=delivery_type,
                                                                timeslot_id=timeslot_id,
                                                                with_bag=with_bag,
                                                                client_id=client_id,
                                                                item_qty=item_qty,
                                                                postings_qty=postings_qty)
        order = db.rezon.get_posting_state_id(str(posting_number[0]))
        return order

    return create_order


@pytest.fixture()
def get_timeslot_id(api, db):
    def get_timeslot_id(ts_type: str, wh_rezon_id: int):
        return api.http.oe_qa_helper.get_timeslot_for_wh(wh_rezon_id=wh_rezon_id, ts_type=ts_type)

    return get_timeslot_id


def wait_for_batching_task(db, posting_id, timeout=500, interval=0.003):  # Интервал в секундах
    start_time = time.time()
    print('жду')

    with ThreadPoolExecutor(max_workers=10) as executor:
        event = Event()
        while time.time() - start_time < timeout:
            res = executor.submit(db.rezon.get_batching_task_by_posting_id, posting_id).result()
            if res and len(res) > 0:
                return res
            event.wait(timeout=interval)  # Используем event.wait вместо sleep для кратковременного ожидания
    return None


def get_timeslots(wh, get_timeslot_id, get_ts_for_pvz):
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        future_day_to_day_timeslot = executor.submit(get_timeslot_id, ts_type="regular", wh_rezon_id=wh.rezon_id)
        future_express_timeslot = executor.submit(get_timeslot_id, ts_type="express", wh_rezon_id=wh.rezon_id)
        future_pvz_timeslot = executor.submit(get_ts_for_pvz, wh.clearing_id)

        day_to_day_timeslot = future_day_to_day_timeslot.result()
        express_timeslot = future_express_timeslot.result()
        pvz_timeslot = future_pvz_timeslot.result()

        return day_to_day_timeslot, express_timeslot, pvz_timeslot


class TestRegressBatching:
    def test_positive_path(api, db, create_order, random_client_user_id, get_timeslot_id, get_ts_for_pvz):
        wh = Warehouse_49113
        day_to_day_timeslot, express_timeslot, pvz_timeslot = get_timeslots(wh, get_timeslot_id, get_ts_for_pvz)
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            future_orders = [
                executor.submit(lambda: create_order(
                    wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_pvz",
                    client_id=random_client_user_id, with_bag=False,
                    timeslot_id=pvz_timeslot['id'], item_qty=1)),
                executor.submit(lambda: create_order(
                    wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_day_to_day",
                    client_id=random_client_user_id + 1, with_bag=False,
                    timeslot_id=day_to_day_timeslot['id'], item_qty=1)),
                executor.submit(lambda: create_order(
                    wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_express",
                    client_id=random_client_user_id + 2, with_bag=True,
                    timeslot_id=express_timeslot['id'], item_qty=1)),
                executor.submit(lambda: create_order(
                    wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_countrywide",
                    client_id=random_client_user_id + 3, with_bag=False,
                    timeslot_id=pvz_timeslot['id'], item_qty=1))
            ]

            orders_results = [future.result() for future in future_orders]
            for order_result in orders_results:
                print(order_result)
                for posting in order_result:
                    res = wait_for_batching_task(db, posting["ID"])
                    assert res
Editor is loading...
Leave a Comment