Untitled
unknown
plain_text
a year ago
25 kB
7
Indexable
from ..repositories.productivity_repository import ProductivityRepository from ..services.helpers.datetime_helper import DatetimeHelper class ProductivityService(): def __init__(self,repository:ProductivityRepository) -> None: self.repository = repository def get_productivity_filters(self,shop_id): data = self.repository.get_productivity_filters(shop_id=shop_id) return self.transform_productivity_filters(data) def transform_productivity_filters(self,data): result = [] for each in data: line_id,line_name,model_id,model_name,variant_id,variant_name,subline_id,subline_name,station_id,station_name = each result.append({ "line_id": line_id, "line_name": line_name, "model_id": model_id, "model_name": model_name, "variant_id": variant_id, "variant_name": variant_name, "subline_id": subline_id, "subline_name": subline_name, "station_id" : station_id, "station_name" : station_name }) return result def get_productivity_cycle_time_metrics_live(self,shop_id, line_arr = [], subline_arr=[], station_arr=[], model_arr=[], variant_arr=[], is_concerned = False): data = self.repository.get_productivity_cycle_time_metrics_live(shop_id=shop_id, line_arr=line_arr, subline_arr=subline_arr, station_arr=station_arr, model_arr=model_arr, variant_arr=variant_arr) return self.tranform_productivity_cycle_time_metrics_live(data,is_concerned) def get_productivity_cycle_time_metrics_historic(self,shop_id, line_arr = [], subline_arr=[], station_arr=[], model_arr=[], variant_arr=[], from_date=None, to_date=None, is_concerned = False, is_delay = None, is_tip_dress = None, is_fault=None,based_on = None): data = self.repository.get_productivity_cycle_time_metrics_historic(shop_id = shop_id, line_arr=line_arr, subline_arr=subline_arr, station_arr=station_arr, model_arr=model_arr, variant_arr=variant_arr, from_date=from_date, to_date=to_date, is_delay=is_delay, is_fault=is_fault, is_tip_dress=is_tip_dress, based_on=based_on ) return self.tranform_productivity_cycle_time_metrics_live(data,is_concerned) def tranform_productivity_cycle_time_metrics_live(self,data,is_concerned): line_arr = [] subline_cycle_time_dict = {} for each in data: (max_id,parameter_id,standard_station_cycle_time ,current_station_cycle_time,standard_line_cycle_time,line_id ,line_name,model_id,model_name,subline_id,subline_name,variant_id, variant_name,part_id,part_name,station_id,station_name,feeder_line,zone) = each if zone is None: zone = 'GREY' if is_concerned and zone == 'RED': line_obj = self.get_line_obj_for_cycle_time(line_id=line_id,line_name=line_name,line_arr = line_arr) subline_obj = self.get_subline_obj_for_cycle_time(subline_id=subline_id, subline_name=subline_name, subline_arr=line_obj['subline_arr'], standard_value=standard_line_cycle_time, current_value=0) station_obj = self.get_station_obj_for_cycle_time(station_id=station_id,station_name=station_name, standard_value= standard_station_cycle_time, current_value= current_station_cycle_time, station_arr=subline_obj['station_arr'],zone=zone) self.get_feeder_line_arr(station_id,station_obj,feeder_line) part_obj = self.get_part_obj_for_cycle_time(part_id=part_id,part_name = part_name ,variant_id=variant_id,variant_name=variant_name ,model_id=model_id,model_name=model_name,part_arr=station_obj['part_arr']) elif is_concerned is False: line_obj = self.get_line_obj_for_cycle_time(line_id=line_id,line_name=line_name,line_arr = line_arr) subline_obj = self.get_subline_obj_for_cycle_time(subline_id=subline_id, subline_name=subline_name, subline_arr=line_obj['subline_arr'], standard_value=standard_line_cycle_time, current_value=0) station_obj = self.get_station_obj_for_cycle_time(station_id=station_id,station_name=station_name, standard_value= standard_station_cycle_time, current_value= current_station_cycle_time, station_arr=subline_obj['station_arr'],zone=zone) self.get_feeder_line_arr(station_id,station_obj,feeder_line) part_obj = self.get_part_obj_for_cycle_time(part_id=part_id,part_name = part_name ,variant_id=variant_id,variant_name=variant_name ,model_id=model_id,model_name=model_name,part_arr=station_obj['part_arr']) for line_obj in line_arr: subline_arr = line_obj['subline_arr'] for subline_obj in subline_arr: subline_obj['current_value'] = self.get_max_of_line_cycle_time(subline_obj['station_arr']) return line_arr def get_max_of_line_cycle_time(self,station_arr): maxx = 0 for station_obj in station_arr: if station_obj['current_value'] != None and maxx < station_obj['current_value'] : maxx = station_obj['current_value'] return maxx def get_part_obj_for_cycle_time(self,part_id,part_name,model_id,model_name,variant_id,variant_name,part_arr): if(len(part_arr) > 0 and part_arr[-1].get('part_id') == part_id and part_arr[-1].get('model_id') == model_id and part_arr[-1].get('variant_id') == variant_id): station_obj = part_arr[-1] else: station_obj = { 'part_id' : part_id, 'part_serial_no' : part_name, 'model_id' : model_id, 'model_name' : model_name, 'variant_id' : variant_id, 'variant_name' : variant_name, } part_arr.append(station_obj) return station_obj def get_station_obj_for_cycle_time(self,station_id,station_name, standard_value,current_value, station_arr,zone): if(len(station_arr) > 0 and station_arr[-1].get('station_id') == station_id): station_obj = station_arr[-1] else: station_obj = { 'station_id' : station_id, 'station_name' : station_name, 'standard_value': standard_value, 'current_value' : current_value, 'zone' : zone, 'feeder_line' : [], 'part_arr' : [] , 'downtime_arr' : [] } station_arr.append(station_obj) return station_obj def get_subline_obj_for_cycle_time(self,subline_id,subline_name,subline_arr, standard_value,current_value): if(len(subline_arr) > 0 and subline_arr[-1].get('subline_id') == subline_id): subline_obj = subline_arr[-1] else: subline_obj = { 'subline_id' : subline_id, 'subline_name' : subline_name, 'standard_value': standard_value, 'current_value' : current_value, 'station_arr' : [] , 'downtime_arr' : [] } subline_arr.append(subline_obj) return subline_obj def get_line_obj_for_cycle_time(self,line_id,line_name,line_arr): if(len(line_arr) > 0 and line_arr[-1].get('line_id') == line_id): line_obj = line_arr[-1] else: line_obj = { 'line_id' : line_id, 'line_name' : line_name, 'subline_arr' : [] } line_arr.append(line_obj) return line_obj def productivity_production_metrics_live(self,shop_id, line_arr = [], subline_arr=[], station_arr=[], model_arr=[], variant_arr=[], from_date=None, to_date=None, is_concerned = False): data = self.repository.productivity_production_metrics_live(shop_id=shop_id, line_arr=line_arr, subline_arr=subline_arr, station_arr=station_arr, model_arr=model_arr, variant_arr=variant_arr, from_date=from_date, to_date=to_date) return self.tranform_productivity_production_metrics(data,is_concerned) def productivity_production_metrics_historic(self,shop_id, line_arr = [], subline_arr=[], station_arr=[], model_arr=[], variant_arr=[], from_date=None, to_date=None, is_concerned = False): data = self.repository.productivity_production_metrics_historic(shop_id=shop_id, line_arr=line_arr, subline_arr=subline_arr, station_arr=station_arr, model_arr=model_arr, variant_arr=variant_arr, from_date=from_date, to_date=to_date) return self.tranform_productivity_production_metrics_historic(data,is_concerned) def tranform_productivity_production_metrics_historic(self,data,is_concerned): line_arr = [] for each in data: (line_id,line_name,subline_id,subline_name,station_id, station_name,planned_qty,subline_qty,station_qty,feeded_subline_id) = each if planned_qty is None or station_qty is None: zone = 'GREY' else : zone = 'RED' if planned_qty > station_qty else 'GREEN' if is_concerned and zone == 'RED': line_obj = self.get_line_obj_for_cycle_time(line_id,line_name,line_arr) subline_obj = self.get_subline_obj_for_cycle_time(subline_id,subline_name,line_obj['subline_arr'],planned_qty,subline_qty) station_obj = self.get_station_obj_for_cycle_time(station_id,station_name,planned_qty,station_qty,subline_obj['station_arr'],zone) self.get_feeder_line_arr(station_id,station_obj,feeded_subline_id) # self.get_part_obj_for_cycle_time(None,None,None,None,None,None,station_obj['part_arr']) elif is_concerned is False: line_obj = self.get_line_obj_for_cycle_time(line_id,line_name,line_arr) subline_obj = self.get_subline_obj_for_cycle_time(subline_id,subline_name,line_obj['subline_arr'],planned_qty,subline_qty) station_obj = self.get_station_obj_for_cycle_time(station_id,station_name,planned_qty,station_qty,subline_obj['station_arr'],zone) self.get_feeder_line_arr(station_id,station_obj,feeded_subline_id) # self.get_part_obj_for_cycle_time(None,None,None,None,None,None,station_obj['part_arr']) return line_arr def tranform_productivity_production_metrics(self,data,is_concerned): line_arr = [] for each in data: (line_id,line_name,planned_qty,subline_id,subline_name,station_id, station_name,model_id,model_name,variant_id,variant_name,part_id, part_name,subline_qty,responsible_station,feeded_subline_id,station_qty,part_serial_no) = each if planned_qty is None or station_qty is None: zone = 'GREY' else : zone = 'RED' if planned_qty > station_qty else 'GREEN' if is_concerned and zone == 'RED': line_obj = self.get_line_obj_for_cycle_time(line_id,line_name,line_arr) subline_obj = self.get_subline_obj_for_cycle_time(subline_id,subline_name,line_obj['subline_arr'],planned_qty,subline_qty) station_obj = self.get_station_obj_for_cycle_time(station_id,station_name,planned_qty,station_qty,subline_obj['station_arr'],zone) self.get_feeder_line_arr(station_id,station_obj,feeded_subline_id) self.get_part_obj_for_cycle_time(part_id,part_serial_no,model_id,model_name,variant_id,variant_name,station_obj['part_arr']) elif is_concerned is False: line_obj = self.get_line_obj_for_cycle_time(line_id,line_name,line_arr) subline_obj = self.get_subline_obj_for_cycle_time(subline_id,subline_name,line_obj['subline_arr'],planned_qty,subline_qty) station_obj = self.get_station_obj_for_cycle_time(station_id,station_name,planned_qty,station_qty,subline_obj['station_arr'],zone) self.get_feeder_line_arr(station_id,station_obj,feeded_subline_id) self.get_part_obj_for_cycle_time(part_id,part_serial_no,model_id,model_name,variant_id,variant_name,station_obj['part_arr']) return line_arr def get_feeder_line_arr(self,station_id,station_obj,feeded_subline_id): if(station_obj.get('station_id') == station_id): station_obj.get('feeder_line').append(feeded_subline_id) def get_productivity_efficiency(self, shop_id, line_arr = [], subline_arr=[], station_arr=[], model_arr=[], variant_arr=[]): downtime_data,break_data = self.repository.productivity_efficiency(shop_id=shop_id, line_arr=line_arr, subline_arr=subline_arr, station_arr=station_arr, model_arr=model_arr, variant_arr=variant_arr) self.get_productivity_efficiency_metrics_live(downtime_data,break_data) def get_productivity_efficiency_metrics_live(self,downtime_data,break_data): line_arr = [] for each in downtime_data: (line_id,line_name,subline_id,subline_name, station_id,station_name,model_id,model_name, variant_id,variant_name,part_id,part_name, start_time,end_time) = each line_obj = self.get_line_obj_for_cycle_time(line_arr=line_arr,line_id=line_id,line_name=line_name) subline_obj = self.get_subline_obj_for_productivity(subline_id,subline_name,line_obj['subline_arr'],0,0) self.add_downtime_in_obj(subline_obj,start_time,end_time) station_obj = self.get_station_obj_for_cycle_time(station_id,station_name,0,0,subline_obj['station_arr'],None) self.add_downtime_in_obj(station_obj,start_time,end_time) self.get_part_obj_for_cycle_time(part_id,part_name,model_id,model_name,variant_id,variant_name,station_obj['part_arr']) for line_obj in line_arr: subline_arr = line_obj['subline_arr'] start_time = "06:30:00" total_break_time = self.get_break_time(break_data,start_time) for subline_obj in subline_arr: subline_obj['downtime_arr'] = self.remove_downtime_overlapping(subline_obj.get('downtime_arr')) station_arr = subline_obj['station_arr'] total_subline_downtime = self.get_overall_downtime(subline_obj['downtime_arr']) total_working_time = self.get_total_working_time(start_time,total_break_time) subline_obj['current_value'] = ((total_working_time - total_subline_downtime)/total_working_time)*100 for station_obj in station_arr: station_obj['downtime_arr'] = self.remove_downtime_overlapping(station_obj.get('downtime_arr')) total_station_downtime = self.get_overall_downtime(station_obj['downtime_arr']) total_working_time = self.get_total_working_time(start_time,total_break_time) station_obj['current_value'] = ((total_working_time - total_station_downtime)/total_station_downtime)*100 def get_overall_downtime(downtime_arr): total_downtime = [] for each in downtime_arr: total_downtime += each[1]-each[0] return total_downtime def get_total_working_time(self,start_time,break_time): total_working_time = 0 current_time = DatetimeHelper.get_ist_datetime() if current_time <= "23:59:59" and current_time >= start_time: total_working_time += current_time - start_time elif (current_time < start_time and current_time > "00:00:00"): total_working_time += ("23:59:59"- start_time) total_working_time += current_time - "00:00:00" return total_working_time - break_time def get_break_time(self,break_data,start_time): total_break_time = 0 for each in break_data: current_time = DatetimeHelper.get_ist_datetime() break_start_time,break_end_time,shop_id = each if current_time < "23:59:59" and current_time > start_time: if start_time < break_start_time <= current_time: total_break_time += (break_end_time - break_start_time) elif(current_time < start_time and current_time > "00:00:00"): if "00:00:00" < break_start_time <= current_time: total_break_time += (break_end_time - break_start_time) return total_break_time def get_station_obj_for_efficiency(self,station_id,station_name, standard_value,current_value, station_arr,zone): if(len(station_arr) > 0 and station_arr[-1].get('station_id') == station_id): station_obj = station_arr[-1] else: station_obj = { 'station_id' : station_id, 'station_name' : station_name, 'standard_value': standard_value, 'current_value' : current_value, 'zone' : zone, 'feeder_line' : [], 'part_arr' : [] , 'downtime_arr' : [] } station_arr.append(station_obj) return station_obj def get_subline_obj_for_productivity(self,subline_id,subline_name,subline_arr, standard_value,current_value): if(len(subline_arr) > 0 and subline_arr[-1].get('subline_id') == subline_id): subline_obj = subline_arr[-1] else: subline_obj = { 'subline_id' : subline_id, 'subline_name' : subline_name, 'standard_value': standard_value, 'current_value' : current_value, 'station_arr' : [] , 'downtime_arr' : [] } subline_arr.append(subline_obj) return subline_obj def add_downtime_in_obj(self,obj,start_time,end_time): obj.get('downtime_arr').append([start_time,end_time]) def remove_downtime_overlapping(self,downtime_arr): n = len(downtime_arr) res = [] for i in range(0,n): current = [] current[0] = downtime_arr[i][0] current[1] = downtime_arr[i][1] start = i+1 while(start<n): if(current[0] <= downtime_arr[start][0] and current[1] >= downtime_arr[start][1]): i+=1 elif(current[0] <= downtime_arr[start][0] and current[1] >= downtime_arr[start][0]): if(current[1] <= downtime_arr[start][1]): current[1] = downtime_arr[start][1] i = i+1 else: i = start - 1 break start =start+1 res.append(current) return res # def get_line_obj(self,line_id,line_name,start_time,end_time):
Editor is loading...
Leave a Comment