Untitled
unknown
plain_text
a year ago
18 kB
6
Indexable
from PG.repositories.insight_repository import InsightRepository from PG.services.helpers.insight_standard_helper import InsightStatus from PG.repositories.models.parameter_measurement import ParameterMeasurement import json from datetime import datetime from PG.services.helpers.string_helpers import StringHelpers class InsightService(): def __init__(self, repository: InsightRepository): self.repository = repository self.model_type = ParameterMeasurement def get_latest_insights_with_filters(self, shop_id, line_arr=None, subline_arr=None, equipment_name_arr=None, equipment_number_arr=None, parameter_name_arr=None, station_arr=None, model_arr=None, variant_arr=None, part_name_arr=None, remarks_arr=None, shift_arr=None, end_time=None, start_time=None, record_per_page='all', page_number=1, zone=None, count_order='default' ): data = self.repository.get_latest_insights_with_filters(shop_id=shop_id, line_arr=line_arr, subline_arr=subline_arr, equipment_name_arr=equipment_name_arr, equipment_number_arr=equipment_number_arr, parameter_name_arr=parameter_name_arr, station_arr=station_arr, model_arr=model_arr, variant_arr=variant_arr, part_name_arr=part_name_arr, remarks_arr=remarks_arr, shift_arr=shift_arr, end_time=end_time, start_time=start_time, record_per_page=record_per_page, page_number=page_number, zone=zone, count_order=count_order) result = self.transform_latest_insights(data) return result def transform_latest_insights(self, data): result = { 'parameters_insight': [ ] } parameter_insights_data = result['parameters_insight'] combined_data = {} for each in data: if each.param_id in combined_data: if combined_data[each.param_id]['remarks']: combined_data[each.param_id]['remarks'].append({'comment': each.component_remarks, 'created_by': each.created_by, 'created_at': each.created_at}) else: combined_data[each.param_id] = { 'parameter_id': each.param_id, 'line_name': each.line_name, 'subline_name': each.final_subline_query, 'station_name': each.station_name, 'equipment_number': each.equipment_number, 'parameter': each.parameter_name, 'color_status': each.zone_status if each.zone_status else InsightStatus.GREY.value, 'unit': each.uom, 'standard': each.standard_value, 'actual': each.actual_value, 'event_type': each.event_type, 'comparator': each.comparator_value, 'remarks': [{'comment': each.component_remarks, 'created_by': each.created_by, 'created_at': each.created_at}] if each.component_remarks else None, 'model': each.model_name, 'count': each.count, 'variant': each.parameter_variant_name, 'part_serial_no': each.part_serial_number, 'equipment_name': each.equipment_name, 'part_name': each.parameter_part_name, 'shift': each.shift } result['parameters_insight'] = list(combined_data.values()) return result def get_insight_fiters(self, shop_id, parameter_name=None, from_date=None, to_date=None): data = self.repository.get_insight_fiters(shop_id=shop_id, parameter_name=parameter_name, from_date=from_date, to_date=to_date) return self.transform_insight_filters(data) def get_parameter_insight_filters(self, equipment_id=None, station_id=None, line_id=None, subline_id=None, model_id=None, shop_id=None, eqipment_number=None, page_number=1, records_per_page=100): data = self.repository.get_parameter_insight_filters(shop_id=shop_id, equipment_id=equipment_id, station_id=station_id, line_id=line_id, subline_id=subline_id, model_id=model_id, page_number=page_number, records_per_page=records_per_page) return self.transform_parameter_insight_filters(data) def transform_parameter_insight_filters(self, data): result = [] for each in data: result.append({ 'parameter_name': each[0], }) return result def transform_insight_filters(self, data): result = [] for each in data: line_id, line_name, subline_id, subline_name, model_id, model_name, station_id, station_name, variant_id, variant_name, equipment_id, equipment_name, part_id, part_name, equipment_number, remarks = each result.append({ "Line_id": line_id, "Line_Name": line_name, "Subline_id": subline_id, "Subline_Name": subline_name, "Model_id": model_id, "Model_Name": model_name, "Station_id": station_id, "Station_Name": station_name, "Variant_id": variant_id, "Variant_Name": variant_name, "Equipment_id": equipment_id, "Equipment_Name": equipment_name, "Equipment Number": equipment_number, "Part_id": part_id, "Part_Name": part_name, "remarks": remarks }) return result def get_latest_insights_matrix(self, shop_id, line_arr=None, subline_arr=None, equipment_name_arr=None, equipment_number_arr=None, parameter_name_arr=None, station_arr=None, model_arr=None, variant_arr=None, part_name_arr=None, remarks_arr=None, shift_arr=None, end_time=None, start_time=None, ): total_count, zone_count = self.repository.get_latest_insights_matrix( shop_id=shop_id, line_arr=line_arr, subline_arr=subline_arr, equipment_name_arr=equipment_name_arr, equipment_number_arr=equipment_number_arr, parameter_name_arr=parameter_name_arr, station_arr=station_arr, model_arr=model_arr, variant_arr=variant_arr, part_name_arr=part_name_arr, remarks_arr=remarks_arr, shift_arr=shift_arr, end_time=end_time, start_time=start_time, ) result = { InsightStatus.GREEN.value: 0, InsightStatus.RED.value: 0, InsightStatus.GREY.value: 0, 'ALL': total_count[0][0] } for item in zone_count: if item.zone is None: continue if item.zone.upper() == InsightStatus.GREEN.value: result[InsightStatus.GREEN.value] = item.count else: result[InsightStatus.RED.value] = item.count result[InsightStatus.GREY.value] = result['ALL'] - result[InsightStatus.GREEN.value] - result[ InsightStatus.RED.value] return result def get_insight_report(self, csvwriter, shop_id=7, line_arr=None, subline_arr=None, equipment_name_arr=None, equipment_number_arr=None, parameter_name_arr=None, station_arr=None, model_arr=None, variant_arr=None, part_name_arr=None, remarks_arr=None, shift_arr=None, end_time=None, start_time=None, record_per_page='all', page_number=1, zone=None, count_order='default', part_ids=None, variant_ids=None, equipment_name_ids=None, station_ids=None, subline_ids=None, model_ids=None, line_ids=None, ): data = self.repository.get_latest_insights_with_filters(shop_id=shop_id, line_arr=line_ids, subline_arr=subline_ids, equipment_name_arr=equipment_name_ids, equipment_number_arr=equipment_number_arr, parameter_name_arr=parameter_name_arr, station_arr=station_ids, model_arr=model_ids, variant_arr=variant_ids, part_name_arr=part_ids, remarks_arr=remarks_arr, shift_arr=shift_arr, end_time=end_time, start_time=start_time, record_per_page=record_per_page, page_number=page_number, zone=zone, count_order=count_order) formatted_data = self.transform_report_data(data) from_time = None if start_time is not None: start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') from_time = start_time.strftime('%d-%m-%Y %I:%M %p') to_time = None if end_time is not None: end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') to_time = end_time.strftime('%d-%m-%Y %I:%M %p') print("start_time is", str(start_time)) csvwriter.writerow([]) csvwriter.writerow( ['Line', StringHelpers.get_comma_seperated_values(line_arr), '', '', 'Part Name', StringHelpers.get_comma_seperated_values(part_name_arr)]) csvwriter.writerow( ['Subline', StringHelpers.get_comma_seperated_values(subline_arr), '', '', 'Parameter', StringHelpers.get_comma_seperated_values(parameter_name_arr)]) csvwriter.writerow( ['Station', StringHelpers.get_comma_seperated_values(station_arr), '', '', 'Remarks', StringHelpers.get_comma_seperated_values(remarks_arr)]) csvwriter.writerow( ['Equipment Name', StringHelpers.get_comma_seperated_values(equipment_name_arr), '', '', 'From', str(from_time or 'All')]) csvwriter.writerow( ['Equipment Number', StringHelpers.get_comma_seperated_values(equipment_number_arr), '', '', 'To', str(to_time or 'All')]) csvwriter.writerow(['Model', StringHelpers.get_comma_seperated_values(model_arr), '', '', 'Shift', str(shift_arr or 'All')]) csvwriter.writerow(['Variant', StringHelpers.get_comma_seperated_values(variant_arr)]) csvwriter.writerow([]) fields = ["Part name", "Model", "Variant", "Line", "Subline", "Station", "Equipment Name", "Equipment Number", "Parameter", "Parameter Zone", "Unit", "Actual", "Standard", "Part Serial No.", "Remarks", "Updated By"] csvwriter.writerow(fields) rows = [] print("result is", formatted_data['parameters_insight']) for each in formatted_data['parameters_insight']: print(each.get('model')) fields = [ each.get('part_name'), each.get('model'), each.get('variant'), each.get('line_name'), each.get('subline_name'), each.get("station_name"), each.get('equipment_name'), each.get('equipment_number'), each.get('parameter'), each.get('color_status'), each.get('unit'), each.get('actual'), each.get('standard'), each.get('part_serial_no'), each.get('remarks'), each.get('updated_by') ] csvwriter.writerow(fields) rows.append(fields) return rows def transform_report_data(self, data): result = { 'parameters_insight': [ ] } combined_data = {} for each in data: combined_data[each.param_id] = { 'parameter_id': each.param_id, 'line_name': each.line_name, 'subline_name': each.final_subline_query, 'station_name': each.station_name, 'equipment_number': each.equipment_number, 'parameter': each.parameter_name, 'color_status': each.zone_status if each.zone_status else InsightStatus.GREY.value, 'unit': each.uom, 'standard': each.standard_value, 'actual': each.actual_value, 'event_type': each.event_type, 'comparator': each.comparator_value, 'remarks': each.component_remarks, 'updated_by': each.created_by, 'model': each.model_name, 'count': each.count, 'variant': each.parameter_variant_name, 'part_serial_no': each.part_serial_number, 'equipment_name': each.equipment_name, 'part_name': each.parameter_part_name, 'shift': each.shift } result['parameters_insight'] = list(combined_data.values()) return result def get_trend_data(self, parameter_id, is_live=False, start_time=None, end_time=None): data = self.repository.get_trend_data( parameter_id=parameter_id, is_live=is_live, start_time=start_time, end_time=end_time ) result = self.transform_trend_data(data) return result def transform_trend_data(self, data): # TODO write logic for data transformation here transformed_data = { 'param_values': [] } for each in data: dict_obj = { "value": each.value, "part_serial_no": each.part_serial_no, "timestamp": each.datetime, "variant": each.variant_name } transformed_data['param_values'].append(dict_obj) transformed_data['start_time'] = transformed_data['param_values'][-1].get('timestamp') transformed_data['end_time'] = transformed_data['param_values'][0].get('timestamp') return transformed_data
Editor is loading...
Leave a Comment