Untitled
unknown
plain_text
a year ago
5.1 kB
1
Indexable
import asyncio from concurrent.futures import ThreadPoolExecutor from random import randint from datetime import datetime, timedelta import pytest from Utils.KrakenOrderHelper.express_whs import Warehouse_49113 from tests.batching.test_mono import update_posting_timeslot @pytest.fixture(scope="session") def random_client_user_id(): yield randint(1000000, 9999999) def update_posting_timeslot(db, posting_number: str, is_express=0): if is_express: new_timeslot = (datetime.utcnow() + timedelta(hours=6.5)).strftime("%Y-%m-%d %H:00:00.") + '000' else: new_timeslot = (datetime.utcnow() + timedelta(hours=7)).strftime("%Y-%m-%d %H:00:00.") + '000' print(posting_number) db.rezon.update_posting_timeslot_by_posting_number(posting_number, new_timeslot) @pytest.fixture() def get_ts_for_pvz(api): async def get_ts_for_pvz(clearing_id: int): return api.http.lms_qa_admin.get_delivery_variants(page_number=1, page_size=500, sc=clearing_id, delivery_variant_type_id=52895552000)["data"][0] # самовывоз return get_ts_for_pvz @pytest.fixture() def create_order(api, db): async def create_order(wh_rezon_id=None, item_id=None, client_id=None, delivery_type=None, with_bag=None, postings_qty=None, item_qty=None, timeslot_id=None): posting_number = api.http.oe_qa_helper.create_oms_order(wh_rezon_id=wh_rezon_id, item_id=item_id, delivery_type=delivery_type, timeslot_id=timeslot_id, with_bag=with_bag, client_id=client_id, item_qty=item_qty, postings_qty=postings_qty)[0] order = db.rezon.get_posting_state_id(str(posting_number)) print(order) is_express = 1 if delivery_type == "mono_express" else 0 update_posting_timeslot(db, order[0]["PostingNumber"], is_express) return order return create_order @pytest.fixture() def get_timeslot_id(api, db): async def get_timeslot_id(ts_type: str, wh_rezon_id: int): return api.http.oe_qa_helper.get_timeslot_for_wh(wh_rezon_id=wh_rezon_id, ts_type=ts_type) return get_timeslot_id async def wait_for_batching_task(db, posting_id, timeout=500, interval=3): executor = ThreadPoolExecutor(max_workers=10) loop = asyncio.get_event_loop() start_time = loop.time() print('жду') try: while loop.time() - start_time < timeout: res = await loop.run_in_executor(executor, db.rezon.get_batching_task_by_posting_id, posting_id) print(res) if res and len(res) > 0: return res await asyncio.sleep(interval) return None finally: executor.shutdown(wait=True) async def get_timeslots(wh, get_timeslot_id, get_ts_for_pvz): return await asyncio.gather( get_timeslot_id(ts_type="regular", wh_rezon_id=wh.rezon_id), get_timeslot_id(ts_type="express", wh_rezon_id=wh.rezon_id), get_ts_for_pvz(wh.clearing_id) ) class TestRegressBatching: @pytest.mark.asyncio async def test_positive_path(api, db, create_order, random_client_user_id, get_timeslot_id, get_ts_for_pvz): wh = Warehouse_49113 day_to_day_timeslot, express_timeslot, pvz_timeslot = await get_timeslots(wh, get_timeslot_id, get_ts_for_pvz) # order1, order2, order3 order = await asyncio.gather( create_order(wh_rezon_id=wh.rezon_id, item_id=163275485, delivery_type="mono_express", client_id=random_client_user_id, with_bag=True, timeslot_id=day_to_day_timeslot["id"]), create_order(wh_rezon_id=wh.rezon_id, item_id=163275485, delivery_type="mono_day_to_day", client_id=random_client_user_id + 1, with_bag=True, timeslot_id=express_timeslot["id"]), create_order(wh_rezon_id=wh.rezon_id, item_id=163275485, delivery_type="mono_pvz", client_id=random_client_user_id + 2, with_bag=False, timeslot_id=pvz_timeslot["id"]), create_order(wh_rezon_id=wh.rezon_id, item_id=163275485, delivery_type="mono_countrywide", client_id=random_client_user_id + 3, with_bag=False, timeslot_id=pvz_timeslot["id"] ), ) print(order) # print(order1) # print(order2) # print(order3) # for posting in order + order1 + order2: # res = await wait_for_batching_task(db, posting["ID"]) # assert res
Editor is loading...
Leave a Comment