Untitled
unknown
plain_text
a month ago
15 kB
3
Indexable
import logging from io import BytesIO from typing import List import openpyxl import pandas as pd import strawberry from sqlalchemy.sql import text from strawberry import mutation, type from strawberry.file_uploads import Upload from app.graphql.schema import Info from app.graphql.types.group_type import GroupType from app.graphql.types.lov_type import LovType from app.graphql.types.product_category_type import ProductCategoryType from app.graphql.types.product_country_type import ProductCountryType from app.models import ( Group, Lov, ProductCategory, ProductCategoryVersion, ProductCountry, ProductSubcategory, ) logger = logging.getLogger(__name__) @type class ImportMutationPayload: success: bool message: str = "" def get_last_version_id(info): # Implement the logic to get the last version ID version = ( info.context.session.query(ProductCategoryVersion) .filter(ProductCategoryVersion.version == "Version") .first() ) if version: version_id = version.version_id version.version_id += 1 return version_id + 1 else: return 1 def get_full_product_country_list(info): # Implement the logic to get the full product country list products = ( info.context.session.query(ProductCountry) .filter(ProductCountry.is_deleted == 0) .all() ) return [ ProductCountryType( id=p.id, country_id=p.country_id, product_name=p.product_name, is_deleted=p.is_deleted, ) for p in products ] def get_full_product_category_list(info): # Implement the logic to get the full product category list categories = ( info.context.session.query(ProductCategory) .filter(ProductCategory.is_deleted == 0) .all() ) return [ ProductCategoryType( id=c.id, version_id=c.version_id, mdm_id=c.mdm_id, prod_cat_grp=c.prod_cat_grp, category_id=c.category_id, ta_sub_type=c.ta_sub_type, last_active=c.last_active, is_deleted=c.is_deleted, created_date=c.created_date, updated_date=c.updated_date, updated_by_id=c.updated_by_id, created_by_id=c.created_by_id, ) for c in categories ] def get_full_group_list(info): # Implement the logic to get the full group list groups = info.context.session.query(Group).filter(Group.is_deleted == 0).all() return [ GroupType( id=g.id, name=g.name, type=g.type, version_id=g.version_id, status_id=g.status_id, created_date=g.created_date, created_by_id=g.created_by_id, updated_date=g.updated_date, updated_by_id=g.updated_by_id, is_deleted=g.is_deleted, description=g.description, ) for g in groups ] def get_lov_values(info, identifier_field_name): # Implement the logic to get the list of values (LOV) for the given identifier field name lovs = ( info.context.session.query(Lov) .filter(Lov.identifier_field_name == identifier_field_name) .all() ) return [ LovType( id=lov.id, name=lov.name, value=lov.value, identifier_field_name=lov.identifier_field_name, is_deleted=lov.is_deleted, created_date=lov.created_date, created_by_id=lov.created_by_id, updated_date=lov.updated_date, updated_by_id=lov.updated_by_id, ) for lov in lovs ] async def get_mdm_id(info, product_name, country, product_country_list): # Implement the logic to get the MDM ID for the given product name and country for pc in product_country_list: country_data = await info.context.load_country_by_id.load(pc.country_id) if pc.product_name == product_name and country_data.name == country: product_country = ( info.context.session.query(ProductCountry) .filter( ProductCountry.product_name == product_name, country_data.name == country, ) .first() ) return product_country.id if product_country else None return None def get_category_id(cat_name, group_list): # Implement the logic to get the category ID for the given category name for group in group_list: if group.name == cat_name and group.type == "CATEGORY": return group.id return None def get_subcategory_ids(topic_names, group_list): # Implement the logic to get the subcategory IDs for the given topic names topic_names_list = [ topic.strip() for line in topic_names.splitlines() for topic in line.split(",") ] subcategory_ids = [] for topic in topic_names_list: for group in group_list: if group.name == topic and group.type == "SUBCATEGORY": subcategory_ids.append(group.id) break return subcategory_ids def is_valid_data( prd_group, mdm_id, cat_id, ta_sub_type, prdt_sbctgries, func_group_list, sub_ta_list, ): # Validate functional group for fg in func_group_list: if prd_group not in fg.value: return False # Validate MDM ID if mdm_id is None: return False # Validate category ID if cat_id is None: return False # Validate subcategories if not prdt_sbctgries: return False # Validate TA sub type if ta_sub_type not in [st.value for st in sub_ta_list]: return False return True def is_existing_record(mdm_id, cat_id, prd_group, ta_sub_type, product_category_list): for prd_cat in product_category_list: if ( prd_cat.mdm_id == mdm_id and prd_cat.category_id == cat_id and prd_cat.prod_cat_grp == prd_group and prd_cat.ta_sub_type == ta_sub_type ): return True return False def truncate_tables(info): try: # Implement the logic to truncate the necessary tables info.context.session.execute(text("SET FOREIGN_KEY_CHECKS=0")) info.context.session.execute(text("TRUNCATE TABLE mdm_product_category")) info.context.session.execute(text("TRUNCATE TABLE mdm_product_subcategory")) info.context.session.execute(text("SET FOREIGN_KEY_CHECKS=1")) info.context.session.commit() except Exception as e: info.context.session.rollback() return f"Error: {str(e)}" def save_product_categories(info, prd_cat_list): try: # Implement the logic to save the product categories to the database for prd_cat in prd_cat_list: product_category = ProductCategory(**prd_cat) info.context.session.add(product_category) info.context.session.commit() except Exception as e: info.context.session.rollback() return f"Error: {str(e)}" def update_product_category_version(info, last_version_id): try: # Implement the logic to update the product category version version = ( info.context.session.query(ProductCategoryVersion) .filter(ProductCategoryVersion.version == "Version") .first() ) if version: version.version_id = last_version_id info.context.session.commit() except Exception as e: info.context.session.rollback() return f"Error: {str(e)}" def save_topics_from_excel(info, df, prd_cat_list, group_list): try: # Implement the logic to save the topics from the Excel file for i, row in df.iterrows(): topic_names = row["TOPIC"].strip() subcategory_ids = get_subcategory_ids(topic_names, group_list) product_category_id = prd_cat_list[i]["category_id"] for subcat_id in subcategory_ids: product_category = ( info.context.session.query(ProductCategory) .filter(ProductCategory.category_id == product_category_id) .first() ) if product_category: product_subcategory = ProductSubcategory( pro_cat_id=product_category.id, subcategory_id=subcat_id ) info.context.session.add(product_subcategory) info.context.session.commit() except Exception as e: info.context.session.rollback() return f"Error: {str(e)}" @type class ImportFunctionalityMutation: @mutation async def read_file(self, file: Upload, import_type: str, info: Info) -> ImportMutationPayload: logger.info(f"Current User {info.context.current_user}") logger.info(f"Reading file {file}") try: # Read the file content into memory file_content = await file.read() # Load the workbook from the file content workbook = openpyxl.load_workbook(filename=BytesIO(file_content)) # Assuming you want to read the first sheet sheet = workbook.active # Read the content of the sheet into a DataFrame data = [] for row in sheet.iter_rows(values_only=True): data.append(row) df = pd.DataFrame(data[1:], columns=data[0]) # Validate headers headers = df.columns.tolist() expected_headers = [ "FUNCTIONAL GROUP", "COUNTRY", "PRODUCT", "CATEGORY", "TOPIC", "TA SUB TYPE", ] if headers != expected_headers: return ImportMutationPayload(success = False, message="This is not a correct excel to upload") # Initialize variables data_error_for_all = "" prd_cat_list = [] last_version_id = ( 1 if import_type.lower() == "truncateandproceed" else get_last_version_id(info) ) # Load necessary data for validation product_country_list = get_full_product_country_list(info) # logger.info(f"COUNTRY LIST: {product_country_list}") product_category_list = get_full_product_category_list(info) # logger.info(f"CATEGORY LIST: {product_category_list}") group_list = get_full_group_list(info) # logger.info(f"GROUP LIST: {group_list}") func_group_list = get_lov_values(info, "PROD CAT GRP") # logger.info(f"FUNCTION GROUP LIST: {func_group_list}") sub_ta_list = get_lov_values(info, "therapeutic") # logger.info(f"SUB TA LIST: {sub_ta_list}") # Process the DataFrame for i, row in df.iterrows(): prd_group = row["FUNCTIONAL GROUP"].strip() country = row["COUNTRY"].strip() product_name = row["PRODUCT"].strip() cat_name = row["CATEGORY"].strip() ta_sub_type = row["TA SUB TYPE"].strip() topic_names = row["TOPIC"].strip() # Validate data mdm_id = await get_mdm_id( info, product_name, country, product_country_list ) cat_id = get_category_id(cat_name, group_list) prdt_sbctgries = get_subcategory_ids(topic_names, group_list) if not is_valid_data( prd_group, mdm_id, cat_id, ta_sub_type, prdt_sbctgries, func_group_list, sub_ta_list, ): data_error_for_all += f"Error in data at line {i+2}: Invalid data\n" continue if ( is_existing_record( mdm_id, cat_id, prd_group, ta_sub_type, product_category_list ) and import_type.lower() != "truncateandproceed" ): data_error_for_all += f"The record at line {i+2} already exists.\n" continue # Create ProductCategory object prd_cat = { "version_id": last_version_id, "prod_cat_grp": prd_group, "mdm_id": mdm_id, "category_id": cat_id, "ta_sub_type": ta_sub_type, "last_active": 1, "is_deleted": 0, "created_date": pd.Timestamp.now(), "created_by_id": "SYSTEM", "updated_date": pd.Timestamp.now(), "updated_by_id": "SYSTEM", } prd_cat_list.append(prd_cat) last_version_id += 1 if data_error_for_all: return ImportMutationPayload(success = False, message={data_error_for_all}) # Save data to the database if import_type.lower() == "truncateandproceed": truncate_tables(info) save_product_categories(info, prd_cat_list) update_product_category_version(info, last_version_id) save_topics_from_excel(info, df, prd_cat_list, group_list) else: save_product_categories(info, prd_cat_list) update_product_category_version(info, last_version_id) save_topics_from_excel(info, df, prd_cat_list, group_list) return ImportMutationPayload(success = True, message="Import completed successfully") except Exception as e: info.context.session.rollback() return ImportMutationPayload(success = False, message=f"Error: {str(e)}") Mutations should be primarily concerned with GraphQL and types. Any non-trivial business logic should be moved into an app/actions/.... All of the utility functions (get_last_version_id, get_full_product_country_list, ...) should live in the action. Actions should not use GraphQL types, but plain python types. Instead of passing in the heavy GraphQL info object, please pass in session, login, and other fields as needed. Mutations should have their own unique payload, not a simple string type. The params to mutations should follow python/GraphQL conventions. ImportType should be import_type in python, which will appear as importType in GraphQL. However, having an import_type param is an anti-pattern in itself. The names in this file are all awful, the file name, the mutation name, etc. It's unclear what is being imported or why.
Editor is loading...
Leave a Comment