Untitled
unknown
plain_text
2 years ago
18 kB
14
Indexable
from PG.repositories.insight_repository import InsightRepository
from PG.services.helpers.insight_standard_helper import InsightStatus
from PG.repositories.models.parameter_measurement import ParameterMeasurement
import json
from datetime import datetime
from PG.services.helpers.string_helpers import StringHelpers
class InsightService():
def __init__(self, repository: InsightRepository):
self.repository = repository
self.model_type = ParameterMeasurement
def get_latest_insights_with_filters(self, shop_id,
line_arr=None,
subline_arr=None,
equipment_name_arr=None,
equipment_number_arr=None,
parameter_name_arr=None,
station_arr=None,
model_arr=None,
variant_arr=None,
part_name_arr=None,
remarks_arr=None,
shift_arr=None,
end_time=None,
start_time=None,
record_per_page='all',
page_number=1,
zone=None,
count_order='default'
):
data = self.repository.get_latest_insights_with_filters(shop_id=shop_id,
line_arr=line_arr,
subline_arr=subline_arr,
equipment_name_arr=equipment_name_arr,
equipment_number_arr=equipment_number_arr,
parameter_name_arr=parameter_name_arr,
station_arr=station_arr,
model_arr=model_arr,
variant_arr=variant_arr,
part_name_arr=part_name_arr,
remarks_arr=remarks_arr,
shift_arr=shift_arr,
end_time=end_time,
start_time=start_time,
record_per_page=record_per_page,
page_number=page_number,
zone=zone,
count_order=count_order)
result = self.transform_latest_insights(data)
return result
def transform_latest_insights(self, data):
result = {
'parameters_insight': [
]
}
parameter_insights_data = result['parameters_insight']
combined_data = {}
for each in data:
if each.param_id in combined_data:
if combined_data[each.param_id]['remarks']:
combined_data[each.param_id]['remarks'].append({'comment': each.component_remarks,
'created_by': each.created_by,
'created_at': each.created_at})
else:
combined_data[each.param_id] = {
'parameter_id': each.param_id,
'line_name': each.line_name,
'subline_name': each.final_subline_query,
'station_name': each.station_name,
'equipment_number': each.equipment_number,
'parameter': each.parameter_name,
'color_status': each.zone_status if each.zone_status else InsightStatus.GREY.value,
'unit': each.uom,
'standard': each.standard_value,
'actual': each.actual_value,
'event_type': each.event_type,
'comparator': each.comparator_value,
'remarks': [{'comment': each.component_remarks,
'created_by': each.created_by,
'created_at': each.created_at}] if each.component_remarks else None,
'model': each.model_name,
'count': each.count,
'variant': each.parameter_variant_name,
'part_serial_no': each.part_serial_number,
'equipment_name': each.equipment_name,
'part_name': each.parameter_part_name,
'shift': each.shift
}
result['parameters_insight'] = list(combined_data.values())
return result
def get_insight_fiters(self, shop_id, parameter_name=None, from_date=None, to_date=None):
data = self.repository.get_insight_fiters(shop_id=shop_id, parameter_name=parameter_name, from_date=from_date,
to_date=to_date)
return self.transform_insight_filters(data)
def get_parameter_insight_filters(self, equipment_id=None, station_id=None, line_id=None, subline_id=None,
model_id=None,
shop_id=None, eqipment_number=None, page_number=1, records_per_page=100):
data = self.repository.get_parameter_insight_filters(shop_id=shop_id, equipment_id=equipment_id,
station_id=station_id,
line_id=line_id, subline_id=subline_id, model_id=model_id,
page_number=page_number, records_per_page=records_per_page)
return self.transform_parameter_insight_filters(data)
def transform_parameter_insight_filters(self, data):
result = []
for each in data:
result.append({
'parameter_name': each[0],
})
return result
def transform_insight_filters(self, data):
result = []
for each in data:
line_id, line_name, subline_id, subline_name, model_id, model_name, station_id, station_name, variant_id, variant_name, equipment_id, equipment_name, part_id, part_name, equipment_number, remarks = each
result.append({
"Line_id": line_id,
"Line_Name": line_name,
"Subline_id": subline_id,
"Subline_Name": subline_name,
"Model_id": model_id,
"Model_Name": model_name,
"Station_id": station_id,
"Station_Name": station_name,
"Variant_id": variant_id,
"Variant_Name": variant_name,
"Equipment_id": equipment_id,
"Equipment_Name": equipment_name,
"Equipment Number": equipment_number,
"Part_id": part_id,
"Part_Name": part_name,
"remarks": remarks
})
return result
def get_latest_insights_matrix(self, shop_id,
line_arr=None,
subline_arr=None,
equipment_name_arr=None,
equipment_number_arr=None,
parameter_name_arr=None,
station_arr=None,
model_arr=None,
variant_arr=None,
part_name_arr=None,
remarks_arr=None,
shift_arr=None,
end_time=None,
start_time=None, ):
total_count, zone_count = self.repository.get_latest_insights_matrix(
shop_id=shop_id,
line_arr=line_arr,
subline_arr=subline_arr,
equipment_name_arr=equipment_name_arr,
equipment_number_arr=equipment_number_arr,
parameter_name_arr=parameter_name_arr,
station_arr=station_arr,
model_arr=model_arr,
variant_arr=variant_arr,
part_name_arr=part_name_arr,
remarks_arr=remarks_arr,
shift_arr=shift_arr,
end_time=end_time,
start_time=start_time,
)
result = {
InsightStatus.GREEN.value: 0,
InsightStatus.RED.value: 0,
InsightStatus.GREY.value: 0,
'ALL': total_count[0][0]
}
for item in zone_count:
if item.zone is None:
continue
if item.zone.upper() == InsightStatus.GREEN.value:
result[InsightStatus.GREEN.value] = item.count
else:
result[InsightStatus.RED.value] = item.count
result[InsightStatus.GREY.value] = result['ALL'] - result[InsightStatus.GREEN.value] - result[
InsightStatus.RED.value]
return result
def get_insight_report(self, csvwriter,
shop_id=7,
line_arr=None,
subline_arr=None,
equipment_name_arr=None,
equipment_number_arr=None,
parameter_name_arr=None,
station_arr=None,
model_arr=None,
variant_arr=None,
part_name_arr=None,
remarks_arr=None,
shift_arr=None,
end_time=None,
start_time=None,
record_per_page='all',
page_number=1,
zone=None,
count_order='default',
part_ids=None,
variant_ids=None,
equipment_name_ids=None,
station_ids=None,
subline_ids=None,
model_ids=None,
line_ids=None,
):
data = self.repository.get_latest_insights_with_filters(shop_id=shop_id,
line_arr=line_ids,
subline_arr=subline_ids,
equipment_name_arr=equipment_name_arr,
equipment_number_arr=equipment_name_ids,
parameter_name_arr=parameter_name_arr,
station_arr=station_ids,
model_arr=model_ids,
variant_arr=variant_ids,
part_name_arr=part_ids,
remarks_arr=remarks_arr,
shift_arr=shift_arr,
end_time=end_time,
start_time=start_time,
record_per_page=record_per_page,
page_number=page_number,
zone=zone,
count_order=count_order)
formatted_data = self.transform_report_data(data)
from_time = None
if start_time is not None:
start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
from_time = start_time.strftime('%d-%m-%Y %I:%M %p')
to_time = None
if end_time is not None:
end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
to_time = end_time.strftime('%d-%m-%Y %I:%M %p')
print("start_time is", str(start_time))
csvwriter.writerow([])
csvwriter.writerow(
['Line', StringHelpers.get_comma_seperated_values(line_arr), '', '', 'Part Name',
StringHelpers.get_comma_seperated_values(part_name_arr)])
csvwriter.writerow(
['Subline', StringHelpers.get_comma_seperated_values(subline_arr), '', '', 'Parameter',
StringHelpers.get_comma_seperated_values(parameter_name_arr)])
csvwriter.writerow(
['Station', StringHelpers.get_comma_seperated_values(station_arr), '', '', 'Remarks',
StringHelpers.get_comma_seperated_values(remarks_arr)])
csvwriter.writerow(
['Equipment Name', StringHelpers.get_comma_seperated_values(equipment_name_arr), '', '', 'From',
str(from_time or 'All')])
csvwriter.writerow(
['Equipment Number', StringHelpers.get_comma_seperated_values(equipment_number_arr), '', '', 'To',
str(to_time or 'All')])
csvwriter.writerow(['Model', StringHelpers.get_comma_seperated_values(model_arr), '', '', 'Shift',
str(shift_arr or 'All')])
csvwriter.writerow(['Variant', StringHelpers.get_comma_seperated_values(variant_arr)])
csvwriter.writerow([])
fields = ["Part name", "Model", "Variant", "Line", "Subline", "Station", "Equipment Name",
"Equipment Number", "Parameter", "Parameter Zone",
"Unit", "Actual", "Standard", "Part Serial No.", "Remarks", "Updated By"]
csvwriter.writerow(fields)
rows = []
print("result is", formatted_data['parameters_insight'])
for each in formatted_data['parameters_insight']:
print(each.get('model'))
fields = [
each.get('part_name'),
each.get('model'),
each.get('variant'),
each.get('line_name'),
each.get('subline_name'),
each.get("station_name"),
each.get('equipment_name'),
each.get('equipment_number'),
each.get('parameter'),
each.get('color_status'),
each.get('unit'),
each.get('actual'),
each.get('standard'),
each.get('part_serial_no'),
each.get('remarks'),
each.get('updated_by')
]
csvwriter.writerow(fields)
rows.append(fields)
return rows
def transform_report_data(self, data):
result = {
'parameters_insight': [
]
}
combined_data = {}
for each in data:
combined_data[each.param_id] = {
'parameter_id': each.param_id,
'line_name': each.line_name,
'subline_name': each.final_subline_query,
'station_name': each.station_name,
'equipment_number': each.equipment_number,
'parameter': each.parameter_name,
'color_status': each.zone_status if each.zone_status else InsightStatus.GREY.value,
'unit': each.uom,
'standard': each.standard_value,
'actual': each.actual_value,
'event_type': each.event_type,
'comparator': each.comparator_value,
'remarks': each.component_remarks,
'updated_by': each.created_by,
'model': each.model_name,
'count': each.count,
'variant': each.parameter_variant_name,
'part_serial_no': each.part_serial_number,
'equipment_name': each.equipment_name,
'part_name': each.parameter_part_name,
'shift': each.shift
}
result['parameters_insight'] = list(combined_data.values())
return result
def get_trend_data(self, parameter_id,
is_live=False,
start_time=None,
end_time=None):
data = self.repository.get_trend_data(
parameter_id=parameter_id,
is_live=is_live,
start_time=start_time,
end_time=end_time
)
result = self.transform_trend_data(data)
return result
def transform_trend_data(self, data):
# TODO write logic for data transformation here
transformed_data = {
'param_values': []
}
for each in data:
dict_obj = {
"value": each.value,
"part_serial_no": each.part_serial_no,
"timestamp": each.datetime,
"variant": each.variant_name
}
transformed_data['param_values'].append(dict_obj)
transformed_data['start_time'] = transformed_data['param_values'][-1].get('timestamp')
transformed_data['end_time'] = transformed_data['param_values'][0].get('timestamp')
return transformed_data
Editor is loading...
Leave a Comment