Untitled
unknown
plain_text
2 years ago
5.7 kB
11
Indexable
import time
from threading import Event
import concurrent
from concurrent.futures import ThreadPoolExecutor
from random import randint
from datetime import datetime, timedelta
import pytest
from Utils.KrakenOrderHelper.express_whs import Warehouse_49113
@pytest.fixture(scope="session")
def random_client_user_id():
yield randint(1000000, 9999999)
def update_posting_timeslot(db, posting_number: str, is_express=0):
if is_express:
new_timeslot = (datetime.utcnow() + timedelta(hours=6.5)).strftime("%Y-%m-%d %H:00:00.") + '000'
else:
new_timeslot = (datetime.utcnow() + timedelta(hours=7)).strftime("%Y-%m-%d %H:00:00.") + '000'
print(posting_number)
db.rezon.update_posting_timeslot_by_posting_number(posting_number, new_timeslot)
@pytest.fixture()
def get_ts_for_pvz(api):
def get_ts_for_pvz(clearing_id: int):
dv = api.http.lms_qa_admin.get_delivery_variants(page_number=1, page_size=500, sc=clearing_id,
delivery_variant_type_id=52895552000)["data"][
0]["id"] # самовывоз
timeslot = api.http.lms_qa_admin.get_delivery_variants_time_slots(dv)
print(timeslot["timeSlots"][0])
return timeslot["timeSlots"][0]
return get_ts_for_pvz
@pytest.fixture()
def create_order(api, db):
def create_order(wh_rezon_id=None, item_id=None, client_id=None, delivery_type=None, with_bag=None,
postings_qty=None, item_qty=None, timeslot_id=None):
posting_number = api.http.oe_qa_helper.create_oms_order(wh_rezon_id=wh_rezon_id,
item_id=item_id,
delivery_type=delivery_type,
timeslot_id=timeslot_id,
with_bag=with_bag,
client_id=client_id,
item_qty=item_qty,
postings_qty=postings_qty)
order = db.rezon.get_posting_state_id(str(posting_number[0]))
return order
return create_order
@pytest.fixture()
def get_timeslot_id(api, db):
def get_timeslot_id(ts_type: str, wh_rezon_id: int):
return api.http.oe_qa_helper.get_timeslot_for_wh(wh_rezon_id=wh_rezon_id, ts_type=ts_type)
return get_timeslot_id
def wait_for_batching_task(db, posting_id, timeout=500, interval=0.003): # Интервал в секундах
start_time = time.time()
print('жду')
with ThreadPoolExecutor(max_workers=10) as executor:
event = Event()
while time.time() - start_time < timeout:
res = executor.submit(db.rezon.get_batching_task_by_posting_id, posting_id).result()
if res and len(res) > 0:
return res
event.wait(timeout=interval) # Используем event.wait вместо sleep для кратковременного ожидания
return None
def get_timeslots(wh, get_timeslot_id, get_ts_for_pvz):
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_day_to_day_timeslot = executor.submit(get_timeslot_id, ts_type="regular", wh_rezon_id=wh.rezon_id)
future_express_timeslot = executor.submit(get_timeslot_id, ts_type="express", wh_rezon_id=wh.rezon_id)
future_pvz_timeslot = executor.submit(get_ts_for_pvz, wh.clearing_id)
day_to_day_timeslot = future_day_to_day_timeslot.result()
express_timeslot = future_express_timeslot.result()
pvz_timeslot = future_pvz_timeslot.result()
return day_to_day_timeslot, express_timeslot, pvz_timeslot
class TestRegressBatching:
def test_positive_path(api, db, create_order, random_client_user_id, get_timeslot_id, get_ts_for_pvz):
wh = Warehouse_49113
day_to_day_timeslot, express_timeslot, pvz_timeslot = get_timeslots(wh, get_timeslot_id, get_ts_for_pvz)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
future_orders = [
executor.submit(lambda: create_order(
wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_pvz",
client_id=random_client_user_id, with_bag=False,
timeslot_id=pvz_timeslot['id'], item_qty=1)),
executor.submit(lambda: create_order(
wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_day_to_day",
client_id=random_client_user_id + 1, with_bag=False,
timeslot_id=day_to_day_timeslot['id'], item_qty=1)),
executor.submit(lambda: create_order(
wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_express",
client_id=random_client_user_id + 2, with_bag=True,
timeslot_id=express_timeslot['id'], item_qty=1)),
executor.submit(lambda: create_order(
wh_rezon_id=wh.rezon_id, item_id=820683186, delivery_type="mono_countrywide",
client_id=random_client_user_id + 3, with_bag=False,
timeslot_id=pvz_timeslot['id'], item_qty=1))
]
orders_results = [future.result() for future in future_orders]
for order_result in orders_results:
print(order_result)
for posting in order_result:
res = wait_for_batching_task(db, posting["ID"])
assert res
Editor is loading...
Leave a Comment