Untitled
import time from threading import Event import concurrent from concurrent.futures import ThreadPoolExecutor from random import randint from datetime import datetime, timedelta import pytest from Utils.KrakenOrderHelper.express_whs import Warehouse_49113 @pytest.fixture(scope="session") def random_client_user_id(): yield randint(1000000, 9999999) def update_posting_timeslot(db, posting_number: str, is_express=0): if is_express: new_timeslot = (datetime.utcnow() + timedelta(hours=6.5)).strftime("%Y-%m-%d %H:00:00.") + '000' else: new_timeslot = (datetime.utcnow() + timedelta(hours=7)).strftime("%Y-%m-%d %H:00:00.") + '000' print(posting_number) db.rezon.update_posting_timeslot_by_posting_number(posting_number, new_timeslot) @pytest.fixture() def get_ts_for_pvz(api): def get_ts_for_pvz(clearing_id: int): dv = api.http.lms_qa_admin.get_delivery_variants(page_number=1, page_size=500, sc=clearing_id, delivery_variant_type_id=52895552000)["data"][ 0]["id"] # самовывоз timeslot = api.http.lms_qa_admin.get_delivery_variants_time_slots(dv) print(timeslot["timeSlots"][0]) return timeslot["timeSlots"][0] return get_ts_for_pvz @pytest.fixture() def create_order(api, db): def create_order(wh_rezon_id=None, item_id=None, client_id=None, delivery_type=None, with_bag=None, postings_qty=None, item_qty=None, timeslot_id=None): posting_number = api.http.oe_qa_helper.create_oms_order(wh_rezon_id=wh_rezon_id, item_id=item_id, delivery_type=delivery_type, timeslot_id=timeslot_id, with_bag=with_bag, client_id=client_id, item_qty=item_qty, postings_qty=postings_qty) order = db.rezon.get_posting_state_id(str(posting_number[0])) return order return create_order @pytest.fixture() def get_timeslot_id(api, db): def get_timeslot_id(ts_type: str, wh_rezon_id: int): return api.http.oe_qa_helper.get_timeslot_for_wh(wh_rezon_id=wh_rezon_id, ts_type=ts_type) return get_timeslot_id def wait_for_batching_task(db, posting_id, timeout=500, interval=0.003): # Интервал в секундах start_time = time.time() print('жду') with ThreadPoolExecutor(max_workers=10) as executor: event = Event() while time.time() - start_time < timeout: res = executor.submit(db.rezon.get_batching_task_by_posting_id, posting_id).result() if res and len(res) > 0: return res event.wait(timeout=interval) # Используем event.wait вместо sleep для кратковременного ожидания return None def get_timeslots(wh, get_timeslot_id, get_ts_for_pvz): with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: future_day_to_day_timeslot = executor.submit(get_timeslot_id, ts_type="regular", wh_rezon_id=wh.rezon_id) future_express_timeslot = executor.submit(get_timeslot_id, ts_type="express", wh_rezon_id=wh.rezon_id) future_pvz_timeslot = executor.submit(get_ts_for_pvz, wh.clearing_id) day_to_day_timeslot = future_day_to_day_timeslot.result() express_timeslot = future_express_timeslot.result() pvz_timeslot = future_pvz_timeslot.result() return day_to_day_timeslot, express_timeslot, pvz_timeslot class TestRegressBatching: def test_positive_path(api, db, create_order, random_client_user_id, get_timeslot_id, get_ts_for_pvz): wh = Warehouse_49113 day_to_day_timeslot, express_timeslot, pvz_timeslot = get_timeslots(wh, get_timeslot_id, get_ts_for_pvz) with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: future_orders = [ executor.submit(lambda: create_order( wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_pvz", client_id=random_client_user_id, with_bag=False, timeslot_id=pvz_timeslot['id'], item_qty=1)), executor.submit(lambda: create_order( wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_day_to_day", client_id=random_client_user_id + 1, with_bag=False, timeslot_id=day_to_day_timeslot['id'], item_qty=1)), executor.submit(lambda: create_order( wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_express", client_id=random_client_user_id + 2, with_bag=True, timeslot_id=express_timeslot['id'], item_qty=1)), executor.submit(lambda: create_order( wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_countrywide", client_id=random_client_user_id + 3, with_bag=False, timeslot_id=pvz_timeslot['id'], item_qty=1)) ] orders_results = [future.result() for future in future_orders] for order_result in orders_results: print(order_result) for posting in order_result: res = wait_for_batching_task(db, posting["ID"]) assert res
Leave a Comment