Untitled
unknown
plain_text
15 days ago
17 kB
8
Indexable
from EM.repositories.repository import Repository from EM.repositories.models.aggregation_energy_tracking import AggregationEnergyTracking from EM.repositories.models.aggregated_energy_5min import AggregatedEnergy5min from EM.utilities.constants import AggregationType from sqlalchemy import Date, Interval, and_, case, cast, desc, distinct, func, Integer, literal, not_, or_, select, \ tuple_, nullslast from EM.repositories.models.aggregated_energy_daily import AggregatedEnergydaily from EM.utilities.constants import LOCATION,PLANT,SHOP,LINE,EQUIPMENT from sqlalchemy.sql.expression import extract import datetime class EnergyMeasurementRepository(Repository): def __init__(self, session): self.session = session def get_latest_aggregation_timestamp(self,aggregation_type): with self.session: return self.session.query( AggregationEnergyTracking.id, AggregationEnergyTracking.timestamp, AggregationEnergyTracking.aggregation_type ).filter(AggregationEnergyTracking.aggregation_type == aggregation_type)\ .order_by(desc(AggregationEnergyTracking.id))\ .first() def get_aggregated_data(self, start_timestamp, end_timestamp, aggregation_type="hour"): """Fetches aggregated energy measurement data within a given time range. Args: start_timestamp (datetime): The starting timestamp for the aggregation period. end_timestamp (datetime): The ending timestamp for the aggregation period. aggregation_type (str, optional): The level of aggregation, either 'hour' or 'day'. Defaults to "hour". Returns: list: A list of aggregated energy records grouped by the specified time interval. """ with self.session: # Ensure the session is properly managed # Define the aggregation column based on the given aggregation type aggregation_column = func.date_trunc(aggregation_type, AggregatedEnergy5min.timestamp).label("aggregated_time") # Construct the query to fetch aggregated energy data query = self.session.query( aggregation_column, # Truncated timestamp based on the aggregation type func.sum(AggregatedEnergy5min.value).label("total_energy"), # Sum up energy values within the aggregation period AggregatedEnergy5min.hiererchy_type_id, AggregatedEnergy5min.hiererchy_ref_id, AggregatedEnergy5min.location_id, AggregatedEnergy5min.site_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id, AggregatedEnergy5min.equipment_id, AggregatedEnergy5min.area_id, AggregatedEnergy5min.station_id, AggregatedEnergy5min.shift, AggregatedEnergy5min.energy_type, AggregatedEnergy5min.subenergy, AggregatedEnergy5min.UOM, AggregatedEnergy5min.usage_type )\ .filter(AggregatedEnergy5min.timestamp.between(start_timestamp, end_timestamp))\ .group_by( # Group data by aggregation time and other relevant fields aggregation_column, AggregatedEnergy5min.hiererchy_type_id, AggregatedEnergy5min.hiererchy_ref_id, AggregatedEnergy5min.location_id, AggregatedEnergy5min.site_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id, AggregatedEnergy5min.equipment_id, AggregatedEnergy5min.area_id, AggregatedEnergy5min.station_id, AggregatedEnergy5min.shift, AggregatedEnergy5min.energy_type, AggregatedEnergy5min.subenergy, AggregatedEnergy5min.UOM, AggregatedEnergy5min.usage_type )\ .order_by("aggregated_time")\ .all() # Execute the query and fetch all results return query # Return the aggregated data def get_energy_consumption_aggregation(self, start_year=None, end_year=None, start_month=None, end_month=None, plant_id=None, location_id=None, energy_type=None, aggregation_type=None, start_date=None, end_date=None, aggregation_level=None, shift=None): # List to store filter conditions for the query filters = [] # List to store grouping levels based on aggregation level aggregation_level_arr = [] # List to store aggregation fields like year, month, etc. aggregation_field = [] # Filtering based on start and end year if start_year is not None and end_year is not None: filters.append(extract('year', AggregatedEnergy5min.timestamp) >= start_year) filters.append(extract('year', AggregatedEnergy5min.timestamp) <= end_year) elif start_year is not None: filters.append(extract('year', AggregatedEnergy5min.timestamp) == start_year) # Filtering based on start and end month if start_month is not None and end_month is not None: filters.append(extract('month', AggregatedEnergy5min.timestamp) >= start_month) filters.append(extract('month', AggregatedEnergy5min.timestamp) <= end_month) elif start_month is not None: filters.append(extract('month', AggregatedEnergy5min.timestamp) == start_month) # Filtering based on start and end date if start_date is not None and end_date is not None: filters.append(AggregatedEnergy5min.timestamp >= start_date) filters.append(AggregatedEnergy5min.timestamp <= end_date) # Defining aggregation level based on the input parameter if aggregation_level == LOCATION: aggregation_level_arr = [AggregatedEnergy5min.location_id] elif aggregation_level == PLANT: aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id] elif aggregation_level == SHOP: aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id] elif aggregation_level == LINE: aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id] elif aggregation_level == EQUIPMENT: aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id, AggregatedEnergy5min.equipment_id] # Adding shift to the aggregation level if provided if shift: aggregation_level_arr.append(AggregatedEnergy5min.shift) filters.append(AggregatedEnergy5min.shift == shift) # Filtering based on location ID if location_id: filters.append(AggregatedEnergy5min.location_id == location_id) # Filtering based on plant ID if plant_id: filters.append(AggregatedEnergy5min.plant_id == plant_id) # Filtering based on energy type if energy_type: filters.append(AggregatedEnergy5min.energy_type == energy_type) # Defining aggregation fields based on aggregation type if aggregation_type == AggregationType.Yearly.value: aggregation_field = [extract('year', AggregatedEnergy5min.timestamp).label('year')] elif aggregation_type == AggregationType.Monthly.value: aggregation_field = [ extract('year', AggregatedEnergy5min.timestamp).label('year'), extract('month', AggregatedEnergy5min.timestamp).label('month') ] elif aggregation_type == AggregationType.Weekly.value: aggregation_field = [ extract('year', AggregatedEnergy5min.timestamp).label('year'), extract('week', AggregatedEnergy5min.timestamp).label('week') ] elif aggregation_type == AggregationType.Daily.value: aggregation_field = [ func.date(AggregatedEnergy5min.timestamp).label('date') ] # Executing the query with applied filters and groupings with self.session: query = self.session.query( AggregatedEnergy5min.site_id, *aggregation_level_arr, AggregatedEnergy5min.shift, AggregatedEnergy5min.energy_type, AggregatedEnergy5min.subenergy, AggregatedEnergy5min.UOM, func.sum(AggregatedEnergy5min.value).label("total_value"), func.sum(AggregatedEnergy5min.units_produced).label("total_units"), *aggregation_field ).filter(*filters)\ .group_by( AggregatedEnergy5min.site_id, *aggregation_level_arr, AggregatedEnergy5min.shift, AggregatedEnergy5min.energy_type, AggregatedEnergy5min.subenergy, AggregatedEnergy5min.UOM, *aggregation_field )\ .all() # Printing each query result for each in query: print(each) return query def add_data(self): with self.session: query = self.session.query( AggregatedEnergy5min.hiererchy_type_id, AggregatedEnergy5min.hiererchy_ref_id, AggregatedEnergy5min.location_id, AggregatedEnergy5min.site_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id, AggregatedEnergy5min.equipment_id, AggregatedEnergy5min.area_id, AggregatedEnergy5min.station_id, AggregatedEnergy5min.shift, AggregatedEnergy5min.energy_type, AggregatedEnergy5min.subenergy, AggregatedEnergy5min.UOM, AggregatedEnergy5min.usage_type, AggregatedEnergy5min.timestamp, AggregatedEnergy5min.value, AggregatedEnergy5min.units_produced ).order_by(AggregatedEnergy5min.timestamp).all() arr = [] for each in query: temp = each.timestamp - datetime.timedelta(days=366) arr.append(AggregatedEnergy5min( hiererchy_type_id=each.hiererchy_type_id, hiererchy_ref_id=each.hiererchy_ref_id, location_id=each.location_id, site_id=each.site_id, plant_id=each.plant_id, shop_id=each.shop_id, line_id=each.line_id, area_id=each.area_id, station_id=each.station_id, equipment_id=each.equipment_id, shift=each.shift, energy_type=each.energy_type, subenergy=each.subenergy, UOM=each.UOM, value=each.value, # Assign the total aggregated energy value timestamp=temp, # Timestamp when the data was aggregated usage_type=each.usage_type, units_produced = each.units_produced )) for each in query: temp = each.timestamp - datetime.timedelta(days=366+365) arr.append(AggregatedEnergy5min( hiererchy_type_id=each.hiererchy_type_id, hiererchy_ref_id=each.hiererchy_ref_id, location_id=each.location_id, site_id=each.site_id, plant_id=each.plant_id, shop_id=each.shop_id, line_id=each.line_id, area_id=each.area_id, station_id=each.station_id, equipment_id=each.equipment_id, shift=each.shift, energy_type=each.energy_type, subenergy=each.subenergy, UOM=each.UOM, value=each.value, # Assign the total aggregated energy value timestamp=temp, # Timestamp when the data was aggregated usage_type=each.usage_type, units_produced = each.units_produced )) for each in query: temp = each.timestamp - datetime.timedelta(days=366+365+365) arr.append(AggregatedEnergy5min( hiererchy_type_id=each.hiererchy_type_id, hiererchy_ref_id=each.hiererchy_ref_id, location_id=each.location_id, site_id=each.site_id, plant_id=each.plant_id, shop_id=each.shop_id, line_id=each.line_id, area_id=each.area_id, station_id=each.station_id, equipment_id=each.equipment_id, shift=each.shift, energy_type=each.energy_type, subenergy=each.subenergy, UOM=each.UOM, value=each.value, # Assign the total aggregated energy value timestamp=temp, # Timestamp when the data was aggregated usage_type=each.usage_type, units_produced = each.units_produced )) for each in query: temp = each.timestamp - datetime.timedelta(days=366+365+365+365) arr.append(AggregatedEnergy5min( hiererchy_type_id=each.hiererchy_type_id, hiererchy_ref_id=each.hiererchy_ref_id, location_id=each.location_id, site_id=each.site_id, plant_id=each.plant_id, shop_id=each.shop_id, line_id=each.line_id, area_id=each.area_id, station_id=each.station_id, equipment_id=each.equipment_id, shift=each.shift, energy_type=each.energy_type, subenergy=each.subenergy, UOM=each.UOM, value=each.value, # Assign the total aggregated energy value timestamp=temp, # Timestamp when the data was aggregated usage_type=each.usage_type, units_produced = each.units_produced )) for each in query: temp = each.timestamp - datetime.timedelta(days=366+365+365+365+365) arr.append(AggregatedEnergy5min( hiererchy_type_id=each.hiererchy_type_id, hiererchy_ref_id=each.hiererchy_ref_id, location_id=each.location_id, site_id=each.site_id, plant_id=each.plant_id, shop_id=each.shop_id, line_id=each.line_id, area_id=each.area_id, station_id=each.station_id, equipment_id=each.equipment_id, shift=each.shift, energy_type=each.energy_type, subenergy=each.subenergy, UOM=each.UOM, value=each.value, # Assign the total aggregated energy value timestamp=temp, # Timestamp when the data was aggregated usage_type=each.usage_type, units_produced = each.units_produced )) self.bulk_insert(arr)
Editor is loading...
Leave a Comment