Untitled
unknown
plain_text
2 years ago
16 kB
6
Indexable
import datetime
import pytz
import pandas as pd
from pandas import (
read_excel,
isnull,
isna,
to_datetime,
notna,
Timestamp,
ExcelWriter
)
import numpy
from ..repositories.plan_repository import PlanRepository
from ..repositories.models.plan import Plan
from ..repositories.models.plan import ProdigiPlanStatusEnum
from .helpers.datetime_helper import DatetimeHelper
ist_tz = pytz.timezone('Asia/Kolkata')
class PlanService():
def __init__(self,repository : PlanRepository):
self.repository = repository
self.models = {}
self.lines = {}
self.part = {}
self.variant = {}
self.lines = {}
def get_plan_part_filters(self, shop_id):
filters = self.repository.get_plan_filters(shop_id)
return self.transform_filters(filters)
def transform_filters(self,filters):
filter_list = []
for filter in filters:
filter_list.append({
"model_id" : filter[0],
"model_name" : filter[1],
"variant_id" : filter[5],
"variant" : filter[2],
"line_id" : filter[3],
"line_name" : filter[4]
})
return {"data" : filter_list}
def upload_production_plan(self, df_plan, shop_id,plan_status_id):
try:
warnings = []
errors = []
# Validate file format and read Excel data
# df_plan = read_excel(file_obj, sheet_name='Working plan', engine='openpyxl')
required_columns = ['Model' , 'Part Name', 'Line','Variant']
if not all(col in df_plan.columns for col in required_columns):
errors.append("Missing required columns in the Plan Excel.")
# Validate future dates
errors = self.validate_future_dates(df_plan,errors)
is_day,is_shift = self.repository.get_shop_configuration(shop_id)
if(is_shift):
errors = self.validate_previous_shift_data(shop_id,df_plan,errors)
if(is_day):
warnings = self.validate_previous_day_data(shop_id,df_plan,warnings)
if len(errors) != 0:
self.repository.update_plan_file_status(plan_status_id,errors=errors,warnings=warnings)
return errors
# Validate and process the plan data
errors = self.validate_plan_data(df_plan, shop_id,errors,is_shift,is_day)
if(len(errors) == 0):
self.process_plan_data(df_plan, shop_id,errors,is_shift)
self.repository.update_plan_file_status(plan_status_id,errors=errors,warnings=warnings)
except Exception as e:
return f"Upload unsuccessful. Reason: {str(e)}"
def validate_previous_shift_data(self,shop_id, df_plan,errors):
cur_minute = datetime.datetime.now(ist_tz).replace(second=0,microsecond=0,tzinfo=None)
current_shift = self.shift_repository.get_shift(shop_id=shop_id,time=cur_minute.time())
# current_shift = 'B' # Replace with your logic to get the current shift
current_day = DatetimeHelper.get_ist_datetime().date()
previous_shift_data = df_plan[
(df_plan['Shift'] < current_shift) & (df_plan['Date'] == current_day)
]
if not previous_shift_data.empty:
errors.append("Cannot upload previous shift data for the current day.")
return errors
def validate_previous_day_data(self,shop_id,df_plan,warnings):
today = DatetimeHelper.get_ist_datetime().date()
previous_dates = []
for date_column in df_plan.columns:
if isinstance(date_column, datetime.datetime) and date_column.date() < today:
previous_dates.append(date_column.date())
for date in previous_dates:
warnings.append(f"previous data of date {date} is being ignored")
return warnings
def validate_future_dates(self, df_plan, errors):
today = DatetimeHelper.get_ist_datetime().date()
future_dates = []
for date_column in df_plan.columns:
if isinstance(date_column, datetime.datetime) and date_column.date() >= today:
future_dates.append(date_column.date())
if len(future_dates) > 30:
errors.append("The Plan Excel should have a maximum of 30 days data in future dates.")
return errors
def validate_plan_data(self,df_plan, shop_id,errors,is_shift):
self.get_part_by_shop_id(shop_id=shop_id)
self.get_variant_by_shop_id(shop_id=shop_id)
self.get_models_by_shop_id(shop_id=shop_id)
self.get_line_by_line_id(shop_id=shop_id)
today = DatetimeHelper.get_ist_datetime().date()
# Iterate through the DataFrame and process each row
for index_label, row in df_plan.iterrows():
model = str(row['Model']).strip().upper()
variant = str(row['Variant']).strip().upper()
part_name = str(row['Part Name']).strip().upper()
line_name = str(row['Line']).strip().upper()
shift = None if isna(row['Shift']) else row['Shift']
for date_column in df_plan.columns:
if date_column not in ['Model', 'Variant', 'Part Name', 'Line', 'Shift'] and isinstance(date_column, datetime.datetime) and date_column.date() >= today: # Exclude non-date columns
if isna(row.get(date_column)) or row.get(date_column) == None:
errors.append(f"Planned Quantity Missing at {date_column}, index_label")
else:
if isinstance(row.get(date_column),int):
pass
else:
errors.append(f"Value at {date_column},{index_label+2} is not int")
if date_column == None or date_column == 'NaN':
errors.append("Missing Date Column ")
if isna(variant):
errors.append(f"Variant : missing row {index_label+2} in working plan")
if isna(part_name):
errors.append(f"Part Name : missing row {index_label+2} in working plan")
if isna(line_name):
errors.append(f"line_name : missing row {index_label + 2} in working Plan")
if is_shift and isna(shift):
errors.append(f"Shift : missing row {index_label + 2} in working Plan")
variant_id = self.variant.get(variant)
model_id = self.models.get(model)
line = self.lines.get(line_name)
part_id = None
if variant_id != None and model_id != None and line != None:
part_id = self.part.get(part_name.strip()+"-"+str(self.variant[variant])+"-"+str(self.models[model])+"-"+str(line))
if variant_id == None:
errors.append(f"{variant} variant is not present in master at {index_label+2}")
if model_id == None:
errors.append(f"{model} model is not present in master at {index_label+2}")
if part_id == None :
errors.append(f"{part_name}+{variant}+{model}+{line_name}, This Combination is not present in master at {index_label+2}")
if line == None:
errors.append(f"{line_name} line is not present in master at {index_label+2}")
return errors
def process_plan_data(self, df_plan, shop_id,errors,is_shift):
today = DatetimeHelper.get_ist_datetime().date()
plan_items_need_to_inserted = []
# Iterate through the DataFrame and process each row
for index_label, row in df_plan.iterrows():
model = str(row['Model']).strip().upper()
variant = str(row['Variant']).strip().upper()
part_name = str(row['Part Name']).strip().upper()
line_name = str(row['Line']).strip().upper()
shift = None if isna(row['Shift']) else row['Shift']
planned_quantities = {}
for date_column in df_plan.columns:
if date_column not in ['Model', 'Variant', 'Part Name', 'Line', 'Shift'] and isinstance(date_column, datetime.datetime) and date_column.date() >= today: # Exclude non-date columns
planned_quantities[date_column.date()] = row.get(date_column)
variant_id = self.variant.get(variant)
model_id = self.models.get(model)
line = self.lines.get(line_name)
part_id = None
if variant_id != None and model_id != None and line != None:
# part_id = self.part.get(part_name.strip()+"-"+str(variant_id)+"-"+str(part_id)+"-"+str(line))
part_id = self.part[part_name.strip()+"-"+str(self.variant[variant])+"-"+str(self.models[model])+"-"+str(line)]
# Check if the data already exists for the given criteria
exiting_production_dates = self.repository.get_existing_plan(part_id,variant_id,model_id,line,list(planned_quantities.keys()),shift)
production_dates_need_to_be_entered = set(planned_quantities.keys()) - set(exiting_production_dates.keys())
# print(exiting_production_dates)
# print(production_dates_need_to_be_entered)
for date in exiting_production_dates.keys():
if planned_quantities.get(date) != None and len(errors) == 0:
self.repository.update_the_existing_planned_quantity(exiting_production_dates[date],planned_quantities.get(date))
for planned_key in production_dates_need_to_be_entered:
plan_model = Plan()
plan_model.line_id = line
plan_model.variant_id = variant_id
plan_model.part_id = part_id
plan_model.production_date = planned_key
plan_model.shift = shift
plan_model.shop_id = shop_id
plan_model.model_id = model_id
plan_model.status = ProdigiPlanStatusEnum.PLANNED.value
plan_model.planned_quantity = planned_quantities[planned_key]
plan_items_need_to_inserted.append(plan_model)
self.insert_plan(plan_items_need_to_inserted)
self.repository.commit()
return errors
def get_models_by_shop_id(self,shop_id):
data = self.repository.get_models_by_shop_id(shop_id=shop_id)
for each in data:
id,name = each
self.models[name] = id
def get_line_by_shop_id(self,shop_id):
data = self.repository.get_line_by_shop_id(shop_id=shop_id)
for each in data:
id,name = each
self.lines[name] = id
def get_variant_by_shop_id(self,shop_id):
data = self.repository.get_variant_by_shop_id(shop_id)
for each in data:
id,name = each
self.variant[name] = id
def get_part_by_shop_id(self,shop_id):
data = self.repository.get_part_by_shop_id(shop_id=shop_id)
for each in data:
id ,name,variant_id,model_id,line_id = each
self.part[name+"-"+str(variant_id)+"-"+str(model_id)+"-"+str(line_id)] = id
return self.part
def get_line_by_line_id(self,shop_id):
data = self.repository.get_line_by_shop_id(shop_id=shop_id)
for each in data:
id,name = each
self.lines[name] = id
def insert_plan(self,plan_items_need_to_inserted):
self.repository.insert_plan(plan_items_need_to_inserted)
def get_plan(self,page_no=1,
page_size=20,
model_list=[],
line_list=[],
variant_list=[],
status=None,
shift=None,
from_dt=None,
to_dt=None,
shop_id=7):
data = self.repository.get_plan(page_no=page_no,
page_size=page_size,
model_list=model_list,
line_list=line_list,
variant_list=variant_list,
status=status,
shift=shift,
from_dt=from_dt,
to_dt=to_dt,
shop_id=shop_id)
return self.transform_plan_data(data)
def transform_plan_data(self,data):
result = []
for each in data :
id, line_id, line_name, model_id, model_name, variant_id, variant_name, production_date, shift, planned_quantity, actual_quantity, status,part_id,part_name = each
result.append({
"id" : id,
"line_id" : line_id,
"line_name" : line_name,
"model_id" : model_id,
"model_name" : model_name,
"variant_id" : variant_id,
"variant_name" : variant_name,
"production_date" : str(production_date),
"shift" : shift,
"planned_quantity" : planned_quantity,
"actual_quantity" : actual_quantity,
"status" : status,
"part_id" : part_id,
"part_name" : part_name
})
return result
def default_plan_job(self):
shop_data = self.transform_shop_data_for_default_plan_job(self.repository.get_all_shops())
today_date = DatetimeHelper.get_ist_datetime().date()
part_unique_comb = {}
for shop_metadata in shop_data:
is_day = shop_metadata["is_day"]
is_shift = shop_metadata["is_shift"]
shop_id = int(shop_metadata["shop_id"])
data = self.repository.get_part_by_shop_id(shop_id=shop_id)
part_unique_comb = self.transform_part_data_for_default_plan_job(data,is_shift)
# self.repository.
def transform_part_data_for_default_plan_job(self,data,is_shift):
part_unique_comb = {}
for each in data:
id, name, variant_id, model_id, line_id = each
if is_shift:
part_unique_comb[
str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "A"] = id
part_unique_comb[
str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "B"] = id
part_unique_comb[
str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "C"] = id
else:
part_unique_comb[
str(id) + "-" + str(variant_id) + "-" + str(model_id) + "-" + str(line_id) + "-" + "_"] = id
return part_unique_comb
def transform_shop_data_for_default_plan_job(self,data):
result = []
for each in data:
id,shop_id,is_day,is_shift = each
result.append({
"id" : id,
"shop_id" : shop_id,
"is_day" : is_day,
"is_shift" : is_shift
})
return result
Editor is loading...
Leave a Comment