Untitled
unknown
plain_text
17 days ago
20 kB
3
Indexable
from EM.repositories.repository import Repository from EM.repositories.models.aggregation_energy_tracking import AggregationEnergyTracking from EM.repositories.models.aggregated_energy_5min import AggregatedEnergy5min from EM.repositories.models.location import Location from EM.repositories.models.plant import Plant from EM.repositories.models.equipment import Equipment from EM.repositories.models.shop import Shop from EM.repositories.models.line import Line from EM.utilities.constants import AggregationType from sqlalchemy import Date, Interval, and_, case, cast, desc, distinct, func, Integer, literal, not_, or_, select, \ tuple_, nullslast from EM.repositories.models.aggregated_energy_daily import AggregatedEnergydaily from EM.utilities.constants import LOCATION,PLANT,SHOP,LINE,EQUIPMENT from sqlalchemy.sql.expression import extract import datetime class EnergyMeasurementRepository(Repository): def __init__(self, session): self.session = session def get_latest_aggregation_timestamp(self,aggregation_type): with self.session: return self.session.query( AggregationEnergyTracking.id, AggregationEnergyTracking.timestamp, AggregationEnergyTracking.aggregation_type ).filter(AggregationEnergyTracking.aggregation_type == aggregation_type)\ .order_by(desc(AggregationEnergyTracking.id))\ .first() def get_aggregated_data(self, start_timestamp, end_timestamp, aggregation_type="hour"): """Fetches aggregated energy measurement data within a given time range. Args: start_timestamp (datetime): The starting timestamp for the aggregation period. end_timestamp (datetime): The ending timestamp for the aggregation period. aggregation_type (str, optional): The level of aggregation, either 'hour' or 'day'. Defaults to "hour". Returns: list: A list of aggregated energy records grouped by the specified time interval. """ with self.session: # Ensure the session is properly managed # Define the aggregation column based on the given aggregation type aggregation_column = func.date_trunc(aggregation_type, AggregatedEnergy5min.timestamp).label("aggregated_time") # Construct the query to fetch aggregated energy data query = self.session.query( aggregation_column, # Truncated timestamp based on the aggregation type func.sum(AggregatedEnergy5min.value).label("total_energy"), # Sum up energy values within the aggregation period AggregatedEnergy5min.hiererchy_type_id, AggregatedEnergy5min.hiererchy_ref_id, AggregatedEnergy5min.location_id, AggregatedEnergy5min.site_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id, AggregatedEnergy5min.equipment_id, AggregatedEnergy5min.area_id, AggregatedEnergy5min.station_id, AggregatedEnergy5min.shift, AggregatedEnergy5min.energy_type, AggregatedEnergy5min.subenergy, AggregatedEnergy5min.UOM, AggregatedEnergy5min.usage_type )\ .filter(AggregatedEnergy5min.timestamp.between(start_timestamp, end_timestamp))\ .group_by( # Group data by aggregation time and other relevant fields aggregation_column, AggregatedEnergy5min.hiererchy_type_id, AggregatedEnergy5min.hiererchy_ref_id, AggregatedEnergy5min.location_id, AggregatedEnergy5min.site_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id, AggregatedEnergy5min.equipment_id, AggregatedEnergy5min.area_id, AggregatedEnergy5min.station_id, AggregatedEnergy5min.shift, AggregatedEnergy5min.energy_type, AggregatedEnergy5min.subenergy, AggregatedEnergy5min.UOM, AggregatedEnergy5min.usage_type )\ .order_by("aggregated_time")\ .all() # Execute the query and fetch all results return query # Return the aggregated data def get_energy_consumption_aggregation(self, start_year=None, end_year=None, start_month=None, end_month=None, plant_id=None, location_id=None, energy_type=None, aggregation_type=None, start_date=None, end_date=None, aggregation_level=None, shift=None): filters = [] # List to store filter conditions aggregation_level_arr = [] # Stores grouping fields aggregation_field = [] # Stores fields used for aggregation join_statements = [] # Stores dynamic joins for different levels select_fields = [AggregatedEnergy5min.site_id] # Base selection fields # Apply filters based on the given year range if start_year is not None and end_year is not None: filters.append(extract('year', AggregatedEnergy5min.timestamp) >= start_year) filters.append(extract('year', AggregatedEnergy5min.timestamp) <= end_year) elif start_year is not None: filters.append(extract('year', AggregatedEnergy5min.timestamp) == start_year) # Apply filters based on the given month range if start_month is not None and end_month is not None: filters.append(extract('month', AggregatedEnergy5min.timestamp) >= start_month) filters.append(extract('month', AggregatedEnergy5min.timestamp) <= end_month) elif start_month is not None: filters.append(extract('month', AggregatedEnergy5min.timestamp) == start_month) # Apply filters based on the given date range if start_date is not None and end_date is not None: filters.append(AggregatedEnergy5min.timestamp >= start_date) filters.append(AggregatedEnergy5min.timestamp <= end_date) # Define aggregation levels, add necessary joins, and extract names if aggregation_level == LOCATION: aggregation_level_arr.append(AggregatedEnergy5min.location_id) join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id)) select_fields.append(Location.name.label("location_name")) elif aggregation_level == PLANT: aggregation_level_arr.extend([AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id]) join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id)) select_fields.append(Location.name.label("location_name")) elif aggregation_level == SHOP: aggregation_level_arr.extend([AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id]) join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id)) join_statements.append((Shop, AggregatedEnergy5min.shop_id == Shop.id)) select_fields.extend([Location.name.label("location_name"), Shop.name.label("shop_name")]) elif aggregation_level == LINE: aggregation_level_arr.extend([AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id]) join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id)) join_statements.append((Shop, AggregatedEnergy5min.shop_id == Shop.id)) join_statements.append((Line, AggregatedEnergy5min.line_id == Line.id)) select_fields.extend([Location.name.label("location_name"), Shop.name.label("shop_name"), Line.name.label("line_name")]) elif aggregation_level == EQUIPMENT: aggregation_level_arr.extend([AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id, AggregatedEnergy5min.equipment_id]) join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id)) join_statements.append((Shop, AggregatedEnergy5min.shop_id == Shop.id)) join_statements.append((Line, AggregatedEnergy5min.line_id == Line.id)) join_statements.append((Equipment, AggregatedEnergy5min.equipment_id == Equipment.id)) select_fields.extend([ Location.name.label("location_name"), Shop.name.label("shop_name"), Line.name.label("line_name"), Equipment.name.label("equipment_name") ]) # Apply shift filtering if provided if shift: aggregation_level_arr.append(AggregatedEnergy5min.shift) filters.append(AggregatedEnergy5min.shift == shift) # Apply additional filters for location, plant, and energy type if location_id: filters.append(AggregatedEnergy5min.location_id == location_id) if plant_id: filters.append(AggregatedEnergy5min.plant_id == plant_id) if energy_type: filters.append(AggregatedEnergy5min.energy_type == energy_type) # Define aggregation fields based on the selected aggregation type if aggregation_type == AggregationType.Yearly.value: aggregation_field = [extract('year', AggregatedEnergy5min.timestamp).label('year')] elif aggregation_type == AggregationType.Monthly.value: aggregation_field = [ extract('year', AggregatedEnergy5min.timestamp).label('year'), extract('month', AggregatedEnergy5min.timestamp).label('month') ] elif aggregation_type == AggregationType.Weekly.value: aggregation_field = [ extract('year', AggregatedEnergy5min.timestamp).label('year'), extract('week', AggregatedEnergy5min.timestamp).label('week') ] elif aggregation_type == AggregationType.Daily.value: aggregation_field = [ func.date(AggregatedEnergy5min.timestamp).label('date') ] # Add aggregation fields and energy-related fields to the selection select_fields.extend([ *aggregation_level_arr, AggregatedEnergy5min.shift, AggregatedEnergy5min.energy_type, AggregatedEnergy5min.subenergy, AggregatedEnergy5min.UOM, func.sum(AggregatedEnergy5min.value).label("total_value"), func.sum(AggregatedEnergy5min.units_produced).label("total_units"), *aggregation_field ]) # Dynamically construct order_by based on available fields order_by_fields = [ extract('year', AggregatedEnergy5min.timestamp), extract('month', AggregatedEnergy5min.timestamp), func.date(AggregatedEnergy5min.timestamp) ] order_by_fields.extend(aggregation_level_arr) with self.session: # Start query query = self.session.query(*select_fields) # Apply dynamic joins for table, condition in join_statements: query = query.join(table, condition, isouter=True) # Apply filters, grouping, and ordering query = query.filter(*filters)\ .group_by(*select_fields)\ .order_by(*order_by_fields)\ .all() for each in query: print(each) # Debugging statement to print query results return query def add_data(self): with self.session: query = self.session.query( AggregatedEnergy5min.hiererchy_type_id, AggregatedEnergy5min.hiererchy_ref_id, AggregatedEnergy5min.location_id, AggregatedEnergy5min.site_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id, AggregatedEnergy5min.equipment_id, AggregatedEnergy5min.area_id, AggregatedEnergy5min.station_id, AggregatedEnergy5min.shift, AggregatedEnergy5min.energy_type, AggregatedEnergy5min.subenergy, AggregatedEnergy5min.UOM, AggregatedEnergy5min.usage_type, AggregatedEnergy5min.timestamp, AggregatedEnergy5min.value, AggregatedEnergy5min.units_produced ).order_by(AggregatedEnergy5min.timestamp).all() arr = [] for each in query: temp = each.timestamp - datetime.timedelta(days=366) arr.append(AggregatedEnergy5min( hiererchy_type_id=each.hiererchy_type_id, hiererchy_ref_id=each.hiererchy_ref_id, location_id=each.location_id, site_id=each.site_id, plant_id=each.plant_id, shop_id=each.shop_id, line_id=each.line_id, area_id=each.area_id, station_id=each.station_id, equipment_id=each.equipment_id, shift=each.shift, energy_type=each.energy_type, subenergy=each.subenergy, UOM=each.UOM, value=each.value, # Assign the total aggregated energy value timestamp=temp, # Timestamp when the data was aggregated usage_type=each.usage_type, units_produced = each.units_produced )) for each in query: temp = each.timestamp - datetime.timedelta(days=366+365) arr.append(AggregatedEnergy5min( hiererchy_type_id=each.hiererchy_type_id, hiererchy_ref_id=each.hiererchy_ref_id, location_id=each.location_id, site_id=each.site_id, plant_id=each.plant_id, shop_id=each.shop_id, line_id=each.line_id, area_id=each.area_id, station_id=each.station_id, equipment_id=each.equipment_id, shift=each.shift, energy_type=each.energy_type, subenergy=each.subenergy, UOM=each.UOM, value=each.value, # Assign the total aggregated energy value timestamp=temp, # Timestamp when the data was aggregated usage_type=each.usage_type, units_produced = each.units_produced )) for each in query: temp = each.timestamp - datetime.timedelta(days=366+365+365) arr.append(AggregatedEnergy5min( hiererchy_type_id=each.hiererchy_type_id, hiererchy_ref_id=each.hiererchy_ref_id, location_id=each.location_id, site_id=each.site_id, plant_id=each.plant_id, shop_id=each.shop_id, line_id=each.line_id, area_id=each.area_id, station_id=each.station_id, equipment_id=each.equipment_id, shift=each.shift, energy_type=each.energy_type, subenergy=each.subenergy, UOM=each.UOM, value=each.value, # Assign the total aggregated energy value timestamp=temp, # Timestamp when the data was aggregated usage_type=each.usage_type, units_produced = each.units_produced )) for each in query: temp = each.timestamp - datetime.timedelta(days=366+365+365+365) arr.append(AggregatedEnergy5min( hiererchy_type_id=each.hiererchy_type_id, hiererchy_ref_id=each.hiererchy_ref_id, location_id=each.location_id, site_id=each.site_id, plant_id=each.plant_id, shop_id=each.shop_id, line_id=each.line_id, area_id=each.area_id, station_id=each.station_id, equipment_id=each.equipment_id, shift=each.shift, energy_type=each.energy_type, subenergy=each.subenergy, UOM=each.UOM, value=each.value, # Assign the total aggregated energy value timestamp=temp, # Timestamp when the data was aggregated usage_type=each.usage_type, units_produced = each.units_produced )) for each in query: temp = each.timestamp - datetime.timedelta(days=366+365+365+365+365) arr.append(AggregatedEnergy5min( hiererchy_type_id=each.hiererchy_type_id, hiererchy_ref_id=each.hiererchy_ref_id, location_id=each.location_id, site_id=each.site_id, plant_id=each.plant_id, shop_id=each.shop_id, line_id=each.line_id, area_id=each.area_id, station_id=each.station_id, equipment_id=each.equipment_id, shift=each.shift, energy_type=each.energy_type, subenergy=each.subenergy, UOM=each.UOM, value=each.value, # Assign the total aggregated energy value timestamp=temp, # Timestamp when the data was aggregated usage_type=each.usage_type, units_produced = each.units_produced )) self.bulk_insert(arr)
Editor is loading...
Leave a Comment