Untitled
unknown
plain_text
a year ago
16 kB
2
Indexable
import datetime import pytz import pandas as pd from pandas import ( read_excel, isnull, isna, to_datetime, notna, Timestamp, ExcelWriter ) import numpy from ..repositories.plan_repository import PlanRepository from ..repositories.models.plan import Plan from ..repositories.models.plan import ProdigiPlanStatusEnum from .helpers.datetime_helper import DatetimeHelper ist_tz = pytz.timezone('Asia/Kolkata') class PlanService(): def __init__(self,repository : PlanRepository): self.repository = repository self.models = {} self.lines = {} self.part = {} self.variant = {} self.lines = {} def get_plan_part_filters(self, shop_id): filters = self.repository.get_plan_filters(shop_id) return self.transform_filters(filters) def transform_filters(self,filters): filter_list = [] for filter in filters: filter_list.append({ "model_id" : filter[0], "model_name" : filter[1], "variant_id" : filter[5], "variant" : filter[2], "line_id" : filter[3], "line_name" : filter[4] }) return {"data" : filter_list} def upload_production_plan(self, df_plan, shop_id,plan_status_id): try: warnings = [] errors = [] # Validate file format and read Excel data # df_plan = read_excel(file_obj, sheet_name='Working plan', engine='openpyxl') required_columns = ['Model' , 'Part Name', 'Line','Variant'] if not all(col in df_plan.columns for col in required_columns): errors.append("Missing required columns in the Plan Excel.") # Validate future dates errors = self.validate_future_dates(df_plan,errors) is_day,is_shift = self.repository.get_shop_configuration(shop_id) if(is_shift): errors = self.validate_previous_shift_data(shop_id,df_plan,errors) if(is_day): warnings = self.validate_previous_day_data(shop_id,df_plan,warnings) if len(errors) != 0: self.repository.update_plan_file_status(plan_status_id,errors=errors,warnings=warnings) return errors # Validate and process the plan data errors = self.validate_plan_data(df_plan, shop_id,errors,is_shift,is_day) if(len(errors) == 0): self.process_plan_data(df_plan, shop_id,errors,is_shift) self.repository.update_plan_file_status(plan_status_id,errors=errors,warnings=warnings) except Exception as e: return f"Upload unsuccessful. Reason: {str(e)}" def validate_previous_shift_data(self,shop_id, df_plan,errors): cur_minute = datetime.datetime.now(ist_tz).replace(second=0,microsecond=0,tzinfo=None) current_shift = self.shift_repository.get_shift(shop_id=shop_id,time=cur_minute.time()) # current_shift = 'B' # Replace with your logic to get the current shift current_day = DatetimeHelper.get_ist_datetime().date() previous_shift_data = df_plan[ (df_plan['Shift'] < current_shift) & (df_plan['Date'] == current_day) ] if not previous_shift_data.empty: errors.append("Cannot upload previous shift data for the current day.") return errors def validate_previous_day_data(self,shop_id,df_plan,warnings): today = DatetimeHelper.get_ist_datetime().date() previous_dates = [] for date_column in df_plan.columns: if isinstance(date_column, datetime.datetime) and date_column.date() < today: previous_dates.append(date_column.date()) for date in previous_dates: warnings.append(f"previous data of date {date} is being ignored") return warnings def validate_future_dates(self, df_plan, errors): today = DatetimeHelper.get_ist_datetime().date() future_dates = [] for date_column in df_plan.columns: if isinstance(date_column, datetime.datetime) and date_column.date() >= today: future_dates.append(date_column.date()) if len(future_dates) > 30: errors.append("The Plan Excel should have a maximum of 30 days data in future dates.") return errors def validate_plan_data(self,df_plan, shop_id,errors,is_shift): self.get_part_by_shop_id(shop_id=shop_id) self.get_variant_by_shop_id(shop_id=shop_id) self.get_models_by_shop_id(shop_id=shop_id) self.get_line_by_line_id(shop_id=shop_id) today = DatetimeHelper.get_ist_datetime().date() # Iterate through the DataFrame and process each row for index_label, row in df_plan.iterrows(): model = str(row['Model']).strip().upper() variant = str(row['Variant']).strip().upper() part_name = str(row['Part Name']).strip().upper() line_name = str(row['Line']).strip().upper() shift = None if isna(row['Shift']) else row['Shift'] for date_column in df_plan.columns: if date_column not in ['Model', 'Variant', 'Part Name', 'Line', 'Shift'] and isinstance(date_column, datetime.datetime) and date_column.date() >= today: # Exclude non-date columns if isna(row.get(date_column)) or row.get(date_column) == None: errors.append(f"Planned Quantity Missing at {date_column}, index_label") else: if isinstance(row.get(date_column),int): pass else: errors.append(f"Value at {date_column},{index_label+2} is not int") if date_column == None or date_column == 'NaN': errors.append("Missing Date Column ") if isna(variant): errors.append(f"Variant : missing row {index_label+2} in working plan") if isna(part_name): errors.append(f"Part Name : missing row {index_label+2} in working plan") if isna(line_name): errors.append(f"line_name : missing row {index_label + 2} in working Plan") if is_shift and isna(shift): errors.append(f"Shift : missing row {index_label + 2} in working Plan") variant_id = self.variant.get(variant) model_id = self.models.get(model) line = self.lines.get(line_name) part_id = None if variant_id != None and model_id != None and line != None: part_id = self.part.get(part_name.strip()+"-"+str(self.variant[variant])+"-"+str(self.models[model])+"-"+str(line)) if variant_id == None: errors.append(f"{variant} variant is not present in master at {index_label+2}") if model_id == None: errors.append(f"{model} model is not present in master at {index_label+2}") if part_id == None : errors.append(f"{part_name}+{variant}+{model}+{line_name}, This Combination is not present in master at {index_label+2}") if line == None: errors.append(f"{line_name} line is not present in master at {index_label+2}") return errors def process_plan_data(self, df_plan, shop_id,errors,is_shift): today = DatetimeHelper.get_ist_datetime().date() plan_items_need_to_inserted = [] # Iterate through the DataFrame and process each row for index_label, row in df_plan.iterrows(): model = str(row['Model']).strip().upper() variant = str(row['Variant']).strip().upper() part_name = str(row['Part Name']).strip().upper() line_name = str(row['Line']).strip().upper() shift = None if isna(row['Shift']) else row['Shift'] planned_quantities = {} for date_column in df_plan.columns: if date_column not in ['Model', 'Variant', 'Part Name', 'Line', 'Shift'] and isinstance(date_column, datetime.datetime) and date_column.date() >= today: # Exclude non-date columns planned_quantities[date_column.date()] = row.get(date_column) variant_id = self.variant.get(variant) model_id = self.models.get(model) line = self.lines.get(line_name) part_id = None if variant_id != None and model_id != None and line != None: # part_id = self.part.get(part_name.strip()+"-"+str(variant_id)+"-"+str(part_id)+"-"+str(line)) part_id = self.part[part_name.strip()+"-"+str(self.variant[variant])+"-"+str(self.models[model])+"-"+str(line)] # Check if the data already exists for the given criteria exiting_production_dates = self.repository.get_existing_plan(part_id,variant_id,model_id,line,list(planned_quantities.keys()),shift) production_dates_need_to_be_entered = set(planned_quantities.keys()) - set(exiting_production_dates.keys()) # print(exiting_production_dates) # print(production_dates_need_to_be_entered) for date in exiting_production_dates.keys(): if planned_quantities.get(date) != None and len(errors) == 0: self.repository.update_the_existing_planned_quantity(exiting_production_dates[date],planned_quantities.get(date)) for planned_key in production_dates_need_to_be_entered: plan_model = Plan() plan_model.line_id = line plan_model.variant_id = variant_id plan_model.part_id = part_id plan_model.production_date = planned_key plan_model.shift = shift plan_model.shop_id = shop_id plan_model.model_id = model_id plan_model.status = ProdigiPlanStatusEnum.PLANNED.value plan_model.planned_quantity = planned_quantities[planned_key] plan_items_need_to_inserted.append(plan_model) self.insert_plan(plan_items_need_to_inserted) self.repository.commit() return errors def get_models_by_shop_id(self,shop_id): data = self.repository.get_models_by_shop_id(shop_id=shop_id) for each in data: id,name = each self.models[name] = id def get_line_by_shop_id(self,shop_id): data = self.repository.get_line_by_shop_id(shop_id=shop_id) for each in data: id,name = each self.lines[name] = id def get_variant_by_shop_id(self,shop_id): data = self.repository.get_variant_by_shop_id(shop_id) for each in data: id,name = each self.variant[name] = id def get_part_by_shop_id(self,shop_id): data = self.repository.get_part_by_shop_id(shop_id=shop_id) for each in data: id ,name,variant_id,model_id,line_id = each self.part[name+"-"+str(variant_id)+"-"+str(model_id)+"-"+str(line_id)] = id return self.part def get_line_by_line_id(self,shop_id): data = self.repository.get_line_by_shop_id(shop_id=shop_id) for each in data: id,name = each self.lines[name] = id def insert_plan(self,plan_items_need_to_inserted): self.repository.insert_plan(plan_items_need_to_inserted) def get_plan(self,page_no=1, page_size=20, model_list=[], line_list=[], variant_list=[], status=None, shift=None, from_dt=None, to_dt=None, shop_id=7): data = self.repository.get_plan(page_no=page_no, page_size=page_size, model_list=model_list, line_list=line_list, variant_list=variant_list, status=status, shift=shift, from_dt=from_dt, to_dt=to_dt, shop_id=shop_id) return self.transform_plan_data(data) def transform_plan_data(self,data): result = [] for each in data : id, line_id, line_name, model_id, model_name, variant_id, variant_name, production_date, shift, planned_quantity, actual_quantity, status,part_id,part_name = each result.append({ "id" : id, "line_id" : line_id, "line_name" : line_name, "model_id" : model_id, "model_name" : model_name, "variant_id" : variant_id, "variant_name" : variant_name, "production_date" : str(production_date), "shift" : shift, "planned_quantity" : planned_quantity, "actual_quantity" : actual_quantity, "status" : status, "part_id" : part_id, "part_name" : part_name }) return result def default_plan_job(self): shop_data = self.transform_shop_data_for_default_plan_job(self.repository.get_all_shops()) today_date = DatetimeHelper.get_ist_datetime().date() part_unique_comb = {} for shop_metadata in shop_data: is_day = shop_metadata["is_day"] is_shift = shop_metadata["is_shift"] shop_id = int(shop_metadata["shop_id"]) data = self.repository.get_part_by_shop_id(shop_id=shop_id) part_unique_comb = self.transform_part_data_for_default_plan_job(data,is_shift) # self.repository. def transform_part_data_for_default_plan_job(self,data,is_shift): part_unique_comb = {} for each in data: id, name, variant_id, model_id, line_id = each if is_shift: part_unique_comb[ str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "A"] = id part_unique_comb[ str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "B"] = id part_unique_comb[ str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "C"] = id else: part_unique_comb[ str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "_"] = id return part_unique_comb def transform_shop_data_for_default_plan_job(self,data): result = [] for each in data: id,shop_id,is_day,is_shift = each result.append({ "id" : id, "shop_id" : shop_id, "is_day" : is_day, "is_shift" : is_shift }) return result
Editor is loading...
Leave a Comment