Untitled

mail@pastecode.io avatar
unknown
plain_text
2 months ago
16 kB
0
Indexable
Never
import datetime
import pytz
import pandas as pd
from pandas import (
    read_excel,
    isnull,
    isna,
    to_datetime,
    notna,
    Timestamp,
    ExcelWriter
)
import numpy
from ..repositories.plan_repository import PlanRepository
from ..repositories.models.plan import Plan
from ..repositories.models.plan import ProdigiPlanStatusEnum
from .helpers.datetime_helper import DatetimeHelper
ist_tz = pytz.timezone('Asia/Kolkata')

class PlanService():
    def __init__(self,repository : PlanRepository):
        self.repository = repository
        self.models = {}
        self.lines = {}
        self.part = {}
        self.variant = {}
        self.lines = {}

    def get_plan_part_filters(self, shop_id):  
        filters = self.repository.get_plan_filters(shop_id)
        return self.transform_filters(filters)
    
    def transform_filters(self,filters):
        filter_list = []
        for filter in filters:
            filter_list.append({
                "model_id" : filter[0],
                "model_name" : filter[1],
                "variant_id" : filter[5],
                "variant" : filter[2],
                "line_id" :  filter[3],
                "line_name" : filter[4]
            })
        return {"data" : filter_list}
    
    def upload_production_plan(self, df_plan, shop_id,plan_status_id):
        try:
            warnings = []
            errors = []
            # Validate file format and read Excel data
            # df_plan = read_excel(file_obj, sheet_name='Working plan', engine='openpyxl')
            required_columns = ['Model' , 'Part Name', 'Line','Variant']
            if not all(col in df_plan.columns for col in required_columns):
                errors.append("Missing required columns in the Plan Excel.")
            
            # Validate future dates
            errors = self.validate_future_dates(df_plan,errors)
            is_day,is_shift = self.repository.get_shop_configuration(shop_id)
          

            if(is_shift):
                errors = self.validate_previous_shift_data(shop_id,df_plan,errors)
            if(is_day):
                warnings = self.validate_previous_day_data(shop_id,df_plan,warnings)
           
            if len(errors) != 0:
                self.repository.update_plan_file_status(plan_status_id,errors=errors,warnings=warnings)
                return errors
            
            # Validate and process the plan data
            errors = self.validate_plan_data(df_plan, shop_id,errors,is_shift,is_day)
            if(len(errors) == 0):
                self.process_plan_data(df_plan, shop_id,errors,is_shift)
            self.repository.update_plan_file_status(plan_status_id,errors=errors,warnings=warnings)



        except Exception as e:
            return f"Upload unsuccessful. Reason: {str(e)}"
    

    def validate_previous_shift_data(self,shop_id, df_plan,errors):
        cur_minute = datetime.datetime.now(ist_tz).replace(second=0,microsecond=0,tzinfo=None)
        current_shift = self.shift_repository.get_shift(shop_id=shop_id,time=cur_minute.time())
        # current_shift = 'B'  # Replace with your logic to get the current shift
        current_day = DatetimeHelper.get_ist_datetime().date()

        previous_shift_data = df_plan[
            (df_plan['Shift'] < current_shift) & (df_plan['Date'] == current_day)
        ]

        if not previous_shift_data.empty:
            errors.append("Cannot upload previous shift data for the current day.")
        
        return errors
        
    
    def validate_previous_day_data(self,shop_id,df_plan,warnings):
        today = DatetimeHelper.get_ist_datetime().date()
        previous_dates = []
        for date_column in df_plan.columns:
            if isinstance(date_column, datetime.datetime) and date_column.date() < today:
                previous_dates.append(date_column.date())

        for date in previous_dates:
            warnings.append(f"previous data of date {date} is being ignored")
        
        return warnings

    def validate_future_dates(self, df_plan, errors):
        today = DatetimeHelper.get_ist_datetime().date()
        future_dates = []
        for date_column in df_plan.columns:
            if isinstance(date_column, datetime.datetime) and date_column.date() >= today:
                future_dates.append(date_column.date())

        if len(future_dates) > 30:
            errors.append("The Plan Excel should have a maximum of 30 days data in future dates.")
        
        return errors
            
    
    def validate_plan_data(self,df_plan, shop_id,errors,is_shift):
        self.get_part_by_shop_id(shop_id=shop_id)
        self.get_variant_by_shop_id(shop_id=shop_id)
        self.get_models_by_shop_id(shop_id=shop_id)
        self.get_line_by_line_id(shop_id=shop_id)
        today = DatetimeHelper.get_ist_datetime().date()

        # Iterate through the DataFrame and process each row
        for index_label, row in df_plan.iterrows():
            model = str(row['Model']).strip().upper()
            variant = str(row['Variant']).strip().upper()
            part_name = str(row['Part Name']).strip().upper()
            line_name = str(row['Line']).strip().upper()
            shift = None if isna(row['Shift']) else row['Shift'] 

            for date_column in df_plan.columns:
                if date_column not in ['Model', 'Variant', 'Part Name', 'Line', 'Shift'] and isinstance(date_column, datetime.datetime) and date_column.date() >= today:  # Exclude non-date columns
                    if isna(row.get(date_column)) or row.get(date_column) == None:
                        errors.append(f"Planned Quantity Missing at {date_column}, index_label")
                    else:
                        if isinstance(row.get(date_column),int):
                            pass
                        else:
                            errors.append(f"Value at {date_column},{index_label+2} is not int")

                if date_column == None or date_column == 'NaN':
                    errors.append("Missing Date Column ")
                    
            if isna(variant):
                errors.append(f"Variant : missing row {index_label+2} in working plan")
            if isna(part_name):
                errors.append(f"Part Name : missing row {index_label+2} in working plan")
            if isna(line_name):
                errors.append(f"line_name : missing row {index_label + 2} in working Plan")
            if is_shift and isna(shift):
                errors.append(f"Shift : missing row {index_label + 2} in working Plan")

    
            variant_id = self.variant.get(variant)
            model_id = self.models.get(model)
            line = self.lines.get(line_name)
            part_id = None

            if variant_id != None and model_id != None and line != None:
                part_id = self.part.get(part_name.strip()+"-"+str(self.variant[variant])+"-"+str(self.models[model])+"-"+str(line))
                
            if variant_id == None:
                errors.append(f"{variant} variant is not present in master at {index_label+2}")
            if model_id == None:
                errors.append(f"{model} model is not present in master at {index_label+2}")
            if part_id == None : 
                errors.append(f"{part_name}+{variant}+{model}+{line_name}, This Combination is not present in master at {index_label+2}")
            if line == None:
                errors.append(f"{line_name} line is not present in master at {index_label+2}")

        return errors

    def process_plan_data(self, df_plan, shop_id,errors,is_shift):
        today = DatetimeHelper.get_ist_datetime().date()
        plan_items_need_to_inserted = []

        # Iterate through the DataFrame and process each row
        for index_label, row in df_plan.iterrows():
            model = str(row['Model']).strip().upper()
            variant = str(row['Variant']).strip().upper()
            part_name = str(row['Part Name']).strip().upper()
            line_name = str(row['Line']).strip().upper()
            shift = None if isna(row['Shift']) else row['Shift'] 
            planned_quantities = {}

            for date_column in df_plan.columns:
                if date_column not in ['Model', 'Variant', 'Part Name', 'Line', 'Shift'] and isinstance(date_column, datetime.datetime) and date_column.date() >= today:  # Exclude non-date columns
                    planned_quantities[date_column.date()] = row.get(date_column)


            variant_id = self.variant.get(variant)
            model_id = self.models.get(model)
            line = self.lines.get(line_name)
            part_id = None

            if variant_id != None and model_id != None and line != None:
                # part_id = self.part.get(part_name.strip()+"-"+str(variant_id)+"-"+str(part_id)+"-"+str(line))
                part_id = self.part[part_name.strip()+"-"+str(self.variant[variant])+"-"+str(self.models[model])+"-"+str(line)]
            
            # Check if the data already exists for the given criteria
            exiting_production_dates = self.repository.get_existing_plan(part_id,variant_id,model_id,line,list(planned_quantities.keys()),shift)
            production_dates_need_to_be_entered = set(planned_quantities.keys()) - set(exiting_production_dates.keys())
            # print(exiting_production_dates)
            # print(production_dates_need_to_be_entered)
            for date in exiting_production_dates.keys():
                if planned_quantities.get(date) != None and len(errors) == 0:
                    self.repository.update_the_existing_planned_quantity(exiting_production_dates[date],planned_quantities.get(date))
    
            for planned_key in production_dates_need_to_be_entered:
                plan_model = Plan()
                plan_model.line_id = line
                plan_model.variant_id = variant_id
                plan_model.part_id = part_id 
                plan_model.production_date = planned_key
                plan_model.shift = shift
                plan_model.shop_id = shop_id
                plan_model.model_id = model_id
                plan_model.status = ProdigiPlanStatusEnum.PLANNED.value
                plan_model.planned_quantity = planned_quantities[planned_key]
                plan_items_need_to_inserted.append(plan_model)

            self.insert_plan(plan_items_need_to_inserted)
            self.repository.commit()
        return errors

    def get_models_by_shop_id(self,shop_id):
        data = self.repository.get_models_by_shop_id(shop_id=shop_id)
        for each in data:
            id,name = each
            self.models[name] = id

    def get_line_by_shop_id(self,shop_id):
        data = self.repository.get_line_by_shop_id(shop_id=shop_id)
        for each in data:
            id,name = each
            self.lines[name] = id

    def get_variant_by_shop_id(self,shop_id):
        data = self.repository.get_variant_by_shop_id(shop_id)
        for each in data:
            id,name = each
            self.variant[name] = id

    def get_part_by_shop_id(self,shop_id):
        data = self.repository.get_part_by_shop_id(shop_id=shop_id)
        for each in data:
            id ,name,variant_id,model_id,line_id  = each
            self.part[name+"-"+str(variant_id)+"-"+str(model_id)+"-"+str(line_id)] = id
        return self.part
    
    def get_line_by_line_id(self,shop_id):
        data = self.repository.get_line_by_shop_id(shop_id=shop_id)
        for each in data:
            id,name = each
            self.lines[name] = id
    
    def insert_plan(self,plan_items_need_to_inserted):
        self.repository.insert_plan(plan_items_need_to_inserted)

    def get_plan(self,page_no=1,
                 page_size=20,
                 model_list=[],
                 line_list=[],
                 variant_list=[],
                 status=None,
                 shift=None,
                 from_dt=None,
                 to_dt=None,
                 shop_id=7):
        data  = self.repository.get_plan(page_no=page_no,
                page_size=page_size,
                model_list=model_list,
                line_list=line_list,
                variant_list=variant_list,
                status=status,
                shift=shift,
                from_dt=from_dt,
                to_dt=to_dt,
                shop_id=shop_id)
        return self.transform_plan_data(data)

    def transform_plan_data(self,data):
        result = []
        for each in data : 
            id, line_id, line_name, model_id, model_name, variant_id, variant_name, production_date, shift, planned_quantity, actual_quantity, status,part_id,part_name = each
            result.append({
                    "id" : id,
                    "line_id" : line_id,
                    "line_name" : line_name,
                    "model_id" : model_id,
                    "model_name" : model_name,
                    "variant_id" : variant_id,
                    "variant_name" : variant_name,
                    "production_date" : str(production_date),
                    "shift" : shift,
                    "planned_quantity" : planned_quantity,
                    "actual_quantity" : actual_quantity,
                    "status" : status,
                    "part_id" : part_id,
                    "part_name" : part_name
                })
        
        return result


    def default_plan_job(self):
        shop_data = self.transform_shop_data_for_default_plan_job(self.repository.get_all_shops())
        today_date = DatetimeHelper.get_ist_datetime().date()
        part_unique_comb = {}
        for shop_metadata in shop_data:
            is_day = shop_metadata["is_day"]
            is_shift = shop_metadata["is_shift"]
            shop_id = int(shop_metadata["shop_id"])
            data = self.repository.get_part_by_shop_id(shop_id=shop_id)
            part_unique_comb = self.transform_part_data_for_default_plan_job(data,is_shift)

            # self.repository.



    def transform_part_data_for_default_plan_job(self,data,is_shift):
        part_unique_comb = {}
        for each in data:
            id, name, variant_id, model_id, line_id = each
            if is_shift:
                part_unique_comb[
                    str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "A"] = id
                part_unique_comb[
                    str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "B"] = id
                part_unique_comb[
                    str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "C"] = id
            else:
                part_unique_comb[
                    str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "_"] = id

        return part_unique_comb

    def transform_shop_data_for_default_plan_job(self,data):
        result = []
        for each in data:
            id,shop_id,is_day,is_shift = each
            result.append({
                "id" : id,
               "shop_id" : shop_id,
                "is_day" : is_day,
                "is_shift" : is_shift
            })
        return result



        
Leave a Comment