Untitled
unknown
plain_text
a year ago
25 kB
8
Indexable
from ..repositories.productivity_repository import ProductivityRepository
from ..services.helpers.datetime_helper import DatetimeHelper
class ProductivityService():
def __init__(self,repository:ProductivityRepository) -> None:
self.repository = repository
def get_productivity_filters(self,shop_id):
data = self.repository.get_productivity_filters(shop_id=shop_id)
return self.transform_productivity_filters(data)
def transform_productivity_filters(self,data):
result = []
for each in data:
line_id,line_name,model_id,model_name,variant_id,variant_name,subline_id,subline_name,station_id,station_name = each
result.append({
"line_id": line_id,
"line_name": line_name,
"model_id": model_id,
"model_name": model_name,
"variant_id": variant_id,
"variant_name": variant_name,
"subline_id": subline_id,
"subline_name": subline_name,
"station_id" : station_id,
"station_name" : station_name
})
return result
def get_productivity_cycle_time_metrics_live(self,shop_id,
line_arr = [],
subline_arr=[],
station_arr=[],
model_arr=[],
variant_arr=[],
is_concerned = False):
data = self.repository.get_productivity_cycle_time_metrics_live(shop_id=shop_id,
line_arr=line_arr,
subline_arr=subline_arr,
station_arr=station_arr,
model_arr=model_arr,
variant_arr=variant_arr)
return self.tranform_productivity_cycle_time_metrics_live(data,is_concerned)
def get_productivity_cycle_time_metrics_historic(self,shop_id,
line_arr = [],
subline_arr=[],
station_arr=[],
model_arr=[],
variant_arr=[],
from_date=None,
to_date=None,
is_concerned = False,
is_delay = None,
is_tip_dress = None,
is_fault=None,based_on = None):
data = self.repository.get_productivity_cycle_time_metrics_historic(shop_id = shop_id,
line_arr=line_arr,
subline_arr=subline_arr,
station_arr=station_arr,
model_arr=model_arr,
variant_arr=variant_arr,
from_date=from_date,
to_date=to_date,
is_delay=is_delay,
is_fault=is_fault,
is_tip_dress=is_tip_dress,
based_on=based_on
)
return self.tranform_productivity_cycle_time_metrics_live(data,is_concerned)
def tranform_productivity_cycle_time_metrics_live(self,data,is_concerned):
line_arr = []
subline_cycle_time_dict = {}
for each in data:
(max_id,parameter_id,standard_station_cycle_time
,current_station_cycle_time,standard_line_cycle_time,line_id
,line_name,model_id,model_name,subline_id,subline_name,variant_id,
variant_name,part_id,part_name,station_id,station_name,feeder_line,zone) = each
if zone is None:
zone = 'GREY'
if is_concerned and zone == 'RED':
line_obj = self.get_line_obj_for_cycle_time(line_id=line_id,line_name=line_name,line_arr = line_arr)
subline_obj = self.get_subline_obj_for_cycle_time(subline_id=subline_id,
subline_name=subline_name,
subline_arr=line_obj['subline_arr'],
standard_value=standard_line_cycle_time,
current_value=0)
station_obj = self.get_station_obj_for_cycle_time(station_id=station_id,station_name=station_name,
standard_value= standard_station_cycle_time,
current_value= current_station_cycle_time,
station_arr=subline_obj['station_arr'],zone=zone)
self.get_feeder_line_arr(station_id,station_obj,feeder_line)
part_obj = self.get_part_obj_for_cycle_time(part_id=part_id,part_name = part_name
,variant_id=variant_id,variant_name=variant_name
,model_id=model_id,model_name=model_name,part_arr=station_obj['part_arr'])
elif is_concerned is False:
line_obj = self.get_line_obj_for_cycle_time(line_id=line_id,line_name=line_name,line_arr = line_arr)
subline_obj = self.get_subline_obj_for_cycle_time(subline_id=subline_id,
subline_name=subline_name,
subline_arr=line_obj['subline_arr'],
standard_value=standard_line_cycle_time,
current_value=0)
station_obj = self.get_station_obj_for_cycle_time(station_id=station_id,station_name=station_name,
standard_value= standard_station_cycle_time,
current_value= current_station_cycle_time,
station_arr=subline_obj['station_arr'],zone=zone)
self.get_feeder_line_arr(station_id,station_obj,feeder_line)
part_obj = self.get_part_obj_for_cycle_time(part_id=part_id,part_name = part_name
,variant_id=variant_id,variant_name=variant_name
,model_id=model_id,model_name=model_name,part_arr=station_obj['part_arr'])
for line_obj in line_arr:
subline_arr = line_obj['subline_arr']
for subline_obj in subline_arr:
subline_obj['current_value'] = self.get_max_of_line_cycle_time(subline_obj['station_arr'])
return line_arr
def get_max_of_line_cycle_time(self,station_arr):
maxx = 0
for station_obj in station_arr:
if station_obj['current_value'] != None and maxx < station_obj['current_value'] :
maxx = station_obj['current_value']
return maxx
def get_part_obj_for_cycle_time(self,part_id,part_name,model_id,model_name,variant_id,variant_name,part_arr):
if(len(part_arr) > 0 and part_arr[-1].get('part_id') == part_id
and part_arr[-1].get('model_id') == model_id
and part_arr[-1].get('variant_id') == variant_id):
station_obj = part_arr[-1]
else:
station_obj = {
'part_id' : part_id,
'part_serial_no' : part_name,
'model_id' : model_id,
'model_name' : model_name,
'variant_id' : variant_id,
'variant_name' : variant_name,
}
part_arr.append(station_obj)
return station_obj
def get_station_obj_for_cycle_time(self,station_id,station_name,
standard_value,current_value,
station_arr,zone):
if(len(station_arr) > 0 and station_arr[-1].get('station_id') == station_id):
station_obj = station_arr[-1]
else:
station_obj = {
'station_id' : station_id,
'station_name' : station_name,
'standard_value': standard_value,
'current_value' : current_value,
'zone' : zone,
'feeder_line' : [],
'part_arr' : [] ,
'downtime_arr' : []
}
station_arr.append(station_obj)
return station_obj
def get_subline_obj_for_cycle_time(self,subline_id,subline_name,subline_arr,
standard_value,current_value):
if(len(subline_arr) > 0 and subline_arr[-1].get('subline_id') == subline_id):
subline_obj = subline_arr[-1]
else:
subline_obj = {
'subline_id' : subline_id,
'subline_name' : subline_name,
'standard_value': standard_value,
'current_value' : current_value,
'station_arr' : [] ,
'downtime_arr' : []
}
subline_arr.append(subline_obj)
return subline_obj
def get_line_obj_for_cycle_time(self,line_id,line_name,line_arr):
if(len(line_arr) > 0 and line_arr[-1].get('line_id') == line_id):
line_obj = line_arr[-1]
else:
line_obj = {
'line_id' : line_id,
'line_name' : line_name,
'subline_arr' : []
}
line_arr.append(line_obj)
return line_obj
def productivity_production_metrics_live(self,shop_id,
line_arr = [],
subline_arr=[],
station_arr=[],
model_arr=[],
variant_arr=[],
from_date=None,
to_date=None,
is_concerned = False):
data = self.repository.productivity_production_metrics_live(shop_id=shop_id,
line_arr=line_arr,
subline_arr=subline_arr,
station_arr=station_arr,
model_arr=model_arr,
variant_arr=variant_arr,
from_date=from_date,
to_date=to_date)
return self.tranform_productivity_production_metrics(data,is_concerned)
def productivity_production_metrics_historic(self,shop_id,
line_arr = [],
subline_arr=[],
station_arr=[],
model_arr=[],
variant_arr=[],
from_date=None,
to_date=None,
is_concerned = False):
data = self.repository.productivity_production_metrics_historic(shop_id=shop_id,
line_arr=line_arr,
subline_arr=subline_arr,
station_arr=station_arr,
model_arr=model_arr,
variant_arr=variant_arr,
from_date=from_date,
to_date=to_date)
return self.tranform_productivity_production_metrics_historic(data,is_concerned)
def tranform_productivity_production_metrics_historic(self,data,is_concerned):
line_arr = []
for each in data:
(line_id,line_name,subline_id,subline_name,station_id,
station_name,planned_qty,subline_qty,station_qty,feeded_subline_id) = each
if planned_qty is None or station_qty is None:
zone = 'GREY'
else :
zone = 'RED' if planned_qty > station_qty else 'GREEN'
if is_concerned and zone == 'RED':
line_obj = self.get_line_obj_for_cycle_time(line_id,line_name,line_arr)
subline_obj = self.get_subline_obj_for_cycle_time(subline_id,subline_name,line_obj['subline_arr'],planned_qty,subline_qty)
station_obj = self.get_station_obj_for_cycle_time(station_id,station_name,planned_qty,station_qty,subline_obj['station_arr'],zone)
self.get_feeder_line_arr(station_id,station_obj,feeded_subline_id)
# self.get_part_obj_for_cycle_time(None,None,None,None,None,None,station_obj['part_arr'])
elif is_concerned is False:
line_obj = self.get_line_obj_for_cycle_time(line_id,line_name,line_arr)
subline_obj = self.get_subline_obj_for_cycle_time(subline_id,subline_name,line_obj['subline_arr'],planned_qty,subline_qty)
station_obj = self.get_station_obj_for_cycle_time(station_id,station_name,planned_qty,station_qty,subline_obj['station_arr'],zone)
self.get_feeder_line_arr(station_id,station_obj,feeded_subline_id)
# self.get_part_obj_for_cycle_time(None,None,None,None,None,None,station_obj['part_arr'])
return line_arr
def tranform_productivity_production_metrics(self,data,is_concerned):
line_arr = []
for each in data:
(line_id,line_name,planned_qty,subline_id,subline_name,station_id,
station_name,model_id,model_name,variant_id,variant_name,part_id,
part_name,subline_qty,responsible_station,feeded_subline_id,station_qty,part_serial_no) = each
if planned_qty is None or station_qty is None:
zone = 'GREY'
else :
zone = 'RED' if planned_qty > station_qty else 'GREEN'
if is_concerned and zone == 'RED':
line_obj = self.get_line_obj_for_cycle_time(line_id,line_name,line_arr)
subline_obj = self.get_subline_obj_for_cycle_time(subline_id,subline_name,line_obj['subline_arr'],planned_qty,subline_qty)
station_obj = self.get_station_obj_for_cycle_time(station_id,station_name,planned_qty,station_qty,subline_obj['station_arr'],zone)
self.get_feeder_line_arr(station_id,station_obj,feeded_subline_id)
self.get_part_obj_for_cycle_time(part_id,part_serial_no,model_id,model_name,variant_id,variant_name,station_obj['part_arr'])
elif is_concerned is False:
line_obj = self.get_line_obj_for_cycle_time(line_id,line_name,line_arr)
subline_obj = self.get_subline_obj_for_cycle_time(subline_id,subline_name,line_obj['subline_arr'],planned_qty,subline_qty)
station_obj = self.get_station_obj_for_cycle_time(station_id,station_name,planned_qty,station_qty,subline_obj['station_arr'],zone)
self.get_feeder_line_arr(station_id,station_obj,feeded_subline_id)
self.get_part_obj_for_cycle_time(part_id,part_serial_no,model_id,model_name,variant_id,variant_name,station_obj['part_arr'])
return line_arr
def get_feeder_line_arr(self,station_id,station_obj,feeded_subline_id):
if(station_obj.get('station_id') == station_id):
station_obj.get('feeder_line').append(feeded_subline_id)
def get_productivity_efficiency(self,
shop_id,
line_arr = [],
subline_arr=[],
station_arr=[],
model_arr=[],
variant_arr=[]):
downtime_data,break_data = self.repository.productivity_efficiency(shop_id=shop_id,
line_arr=line_arr,
subline_arr=subline_arr,
station_arr=station_arr,
model_arr=model_arr,
variant_arr=variant_arr)
self.get_productivity_efficiency_metrics_live(downtime_data,break_data)
def get_productivity_efficiency_metrics_live(self,downtime_data,break_data):
line_arr = []
for each in downtime_data:
(line_id,line_name,subline_id,subline_name,
station_id,station_name,model_id,model_name,
variant_id,variant_name,part_id,part_name,
start_time,end_time) = each
line_obj = self.get_line_obj_for_cycle_time(line_arr=line_arr,line_id=line_id,line_name=line_name)
subline_obj = self.get_subline_obj_for_productivity(subline_id,subline_name,line_obj['subline_arr'],0,0)
self.add_downtime_in_obj(subline_obj,start_time,end_time)
station_obj = self.get_station_obj_for_cycle_time(station_id,station_name,0,0,subline_obj['station_arr'],None)
self.add_downtime_in_obj(station_obj,start_time,end_time)
self.get_part_obj_for_cycle_time(part_id,part_name,model_id,model_name,variant_id,variant_name,station_obj['part_arr'])
for line_obj in line_arr:
subline_arr = line_obj['subline_arr']
start_time = "06:30:00"
total_break_time = self.get_break_time(break_data,start_time)
for subline_obj in subline_arr:
subline_obj['downtime_arr'] = self.remove_downtime_overlapping(subline_obj.get('downtime_arr'))
station_arr = subline_obj['station_arr']
total_subline_downtime = self.get_overall_downtime(subline_obj['downtime_arr'])
total_working_time = self.get_total_working_time(start_time,total_break_time)
subline_obj['current_value'] = ((total_working_time - total_subline_downtime)/total_working_time)*100
for station_obj in station_arr:
station_obj['downtime_arr'] = self.remove_downtime_overlapping(station_obj.get('downtime_arr'))
total_station_downtime = self.get_overall_downtime(station_obj['downtime_arr'])
total_working_time = self.get_total_working_time(start_time,total_break_time)
station_obj['current_value'] = ((total_working_time - total_station_downtime)/total_station_downtime)*100
def get_overall_downtime(downtime_arr):
total_downtime = []
for each in downtime_arr:
total_downtime += each[1]-each[0]
return total_downtime
def get_total_working_time(self,start_time,break_time):
total_working_time = 0
current_time = DatetimeHelper.get_ist_datetime()
if current_time <= "23:59:59" and current_time >= start_time:
total_working_time += current_time - start_time
elif (current_time < start_time and current_time > "00:00:00"):
total_working_time += ("23:59:59"- start_time)
total_working_time += current_time - "00:00:00"
return total_working_time - break_time
def get_break_time(self,break_data,start_time):
total_break_time = 0
for each in break_data:
current_time = DatetimeHelper.get_ist_datetime()
break_start_time,break_end_time,shop_id = each
if current_time < "23:59:59" and current_time > start_time:
if start_time < break_start_time <= current_time:
total_break_time += (break_end_time - break_start_time)
elif(current_time < start_time and current_time > "00:00:00"):
if "00:00:00" < break_start_time <= current_time:
total_break_time += (break_end_time - break_start_time)
return total_break_time
def get_station_obj_for_efficiency(self,station_id,station_name,
standard_value,current_value,
station_arr,zone):
if(len(station_arr) > 0 and station_arr[-1].get('station_id') == station_id):
station_obj = station_arr[-1]
else:
station_obj = {
'station_id' : station_id,
'station_name' : station_name,
'standard_value': standard_value,
'current_value' : current_value,
'zone' : zone,
'feeder_line' : [],
'part_arr' : [] ,
'downtime_arr' : []
}
station_arr.append(station_obj)
return station_obj
def get_subline_obj_for_productivity(self,subline_id,subline_name,subline_arr,
standard_value,current_value):
if(len(subline_arr) > 0 and subline_arr[-1].get('subline_id') == subline_id):
subline_obj = subline_arr[-1]
else:
subline_obj = {
'subline_id' : subline_id,
'subline_name' : subline_name,
'standard_value': standard_value,
'current_value' : current_value,
'station_arr' : [] ,
'downtime_arr' : []
}
subline_arr.append(subline_obj)
return subline_obj
def add_downtime_in_obj(self,obj,start_time,end_time):
obj.get('downtime_arr').append([start_time,end_time])
def remove_downtime_overlapping(self,downtime_arr):
n = len(downtime_arr)
res = []
for i in range(0,n):
current = []
current[0] = downtime_arr[i][0]
current[1] = downtime_arr[i][1]
start = i+1
while(start<n):
if(current[0] <= downtime_arr[start][0] and current[1] >= downtime_arr[start][1]):
i+=1
elif(current[0] <= downtime_arr[start][0] and current[1] >= downtime_arr[start][0]):
if(current[1] <= downtime_arr[start][1]):
current[1] = downtime_arr[start][1]
i = i+1
else:
i = start - 1
break
start =start+1
res.append(current)
return res
# def get_line_obj(self,line_id,line_name,start_time,end_time):
Editor is loading...
Leave a Comment