Untitled
unknown
plain_text
2 years ago
5.1 kB
8
Indexable
import asyncio
from concurrent.futures import ThreadPoolExecutor
from random import randint
from datetime import datetime, timedelta
import pytest
from Utils.KrakenOrderHelper.express_whs import Warehouse_49113
from tests.batching.test_mono import update_posting_timeslot
@pytest.fixture(scope="session")
def random_client_user_id():
yield randint(1000000, 9999999)
def update_posting_timeslot(db, posting_number: str, is_express=0):
if is_express:
new_timeslot = (datetime.utcnow() + timedelta(hours=6.5)).strftime("%Y-%m-%d %H:00:00.") + '000'
else:
new_timeslot = (datetime.utcnow() + timedelta(hours=7)).strftime("%Y-%m-%d %H:00:00.") + '000'
print(posting_number)
db.rezon.update_posting_timeslot_by_posting_number(posting_number, new_timeslot)
@pytest.fixture()
def get_ts_for_pvz(api):
async def get_ts_for_pvz(clearing_id: int):
return api.http.lms_qa_admin.get_delivery_variants(page_number=1, page_size=500, sc=clearing_id,
delivery_variant_type_id=52895552000)["data"][0] # самовывоз
return get_ts_for_pvz
@pytest.fixture()
def create_order(api, db):
async def create_order(wh_rezon_id=None, item_id=None, client_id=None, delivery_type=None, with_bag=None,
postings_qty=None, item_qty=None, timeslot_id=None):
posting_number = api.http.oe_qa_helper.create_oms_order(wh_rezon_id=wh_rezon_id,
item_id=item_id,
delivery_type=delivery_type,
timeslot_id=timeslot_id,
with_bag=with_bag,
client_id=client_id,
item_qty=item_qty,
postings_qty=postings_qty)[0]
order = db.rezon.get_posting_state_id(str(posting_number))
print(order)
is_express = 1 if delivery_type == "mono_express" else 0
update_posting_timeslot(db, order[0]["PostingNumber"], is_express)
return order
return create_order
@pytest.fixture()
def get_timeslot_id(api, db):
async def get_timeslot_id(ts_type: str, wh_rezon_id: int):
return api.http.oe_qa_helper.get_timeslot_for_wh(wh_rezon_id=wh_rezon_id, ts_type=ts_type)
return get_timeslot_id
async def wait_for_batching_task(db, posting_id, timeout=500, interval=3):
executor = ThreadPoolExecutor(max_workers=10)
loop = asyncio.get_event_loop()
start_time = loop.time()
print('жду')
try:
while loop.time() - start_time < timeout:
res = await loop.run_in_executor(executor, db.rezon.get_batching_task_by_posting_id, posting_id)
print(res)
if res and len(res) > 0:
return res
await asyncio.sleep(interval)
return None
finally:
executor.shutdown(wait=True)
async def get_timeslots(wh, get_timeslot_id, get_ts_for_pvz):
return await asyncio.gather(
get_timeslot_id(ts_type="regular", wh_rezon_id=wh.rezon_id),
get_timeslot_id(ts_type="express", wh_rezon_id=wh.rezon_id),
get_ts_for_pvz(wh.clearing_id)
)
class TestRegressBatching:
@pytest.mark.asyncio
async def test_positive_path(api, db, create_order, random_client_user_id, get_timeslot_id, get_ts_for_pvz):
wh = Warehouse_49113
day_to_day_timeslot, express_timeslot, pvz_timeslot = await get_timeslots(wh, get_timeslot_id, get_ts_for_pvz)
# order1, order2, order3
order = await asyncio.gather(
create_order(wh_rezon_id=wh.rezon_id, item_id=163275485,
delivery_type="mono_express", client_id=random_client_user_id, with_bag=True,
timeslot_id=day_to_day_timeslot["id"]),
create_order(wh_rezon_id=wh.rezon_id, item_id=163275485,
delivery_type="mono_day_to_day", client_id=random_client_user_id + 1, with_bag=True,
timeslot_id=express_timeslot["id"]),
create_order(wh_rezon_id=wh.rezon_id, item_id=163275485,
delivery_type="mono_pvz", client_id=random_client_user_id + 2, with_bag=False,
timeslot_id=pvz_timeslot["id"]),
create_order(wh_rezon_id=wh.rezon_id, item_id=163275485,
delivery_type="mono_countrywide", client_id=random_client_user_id + 3, with_bag=False,
timeslot_id=pvz_timeslot["id"]
),
)
print(order)
# print(order1)
# print(order2)
# print(order3)
# for posting in order + order1 + order2:
# res = await wait_for_batching_task(db, posting["ID"])
# assert res
Editor is loading...
Leave a Comment