Untitled

 avatar
unknown
plain_text
15 days ago
17 kB
8
Indexable
from EM.repositories.repository import Repository
from EM.repositories.models.aggregation_energy_tracking import AggregationEnergyTracking
from EM.repositories.models.aggregated_energy_5min import AggregatedEnergy5min
from EM.utilities.constants import AggregationType
from sqlalchemy import Date, Interval, and_, case, cast, desc, distinct, func, Integer, literal, not_, or_, select, \
    tuple_, nullslast
from EM.repositories.models.aggregated_energy_daily import AggregatedEnergydaily
from EM.utilities.constants import LOCATION,PLANT,SHOP,LINE,EQUIPMENT
from sqlalchemy.sql.expression import extract
import datetime
class EnergyMeasurementRepository(Repository):

    def __init__(self, session):
        self.session = session

    def get_latest_aggregation_timestamp(self,aggregation_type):
        with self.session:
            return self.session.query(
                AggregationEnergyTracking.id,
                AggregationEnergyTracking.timestamp,
                AggregationEnergyTracking.aggregation_type
            ).filter(AggregationEnergyTracking.aggregation_type == aggregation_type)\
            .order_by(desc(AggregationEnergyTracking.id))\
            .first()
        
    def get_aggregated_data(self, start_timestamp, end_timestamp, aggregation_type="hour"):
        """Fetches aggregated energy measurement data within a given time range.
        
        Args:
            start_timestamp (datetime): The starting timestamp for the aggregation period.
            end_timestamp (datetime): The ending timestamp for the aggregation period.
            aggregation_type (str, optional): The level of aggregation, either 'hour' or 'day'. Defaults to "hour".
        
        Returns:
            list: A list of aggregated energy records grouped by the specified time interval.
        """
        
        with self.session:  # Ensure the session is properly managed
            
            # Define the aggregation column based on the given aggregation type
            aggregation_column = func.date_trunc(aggregation_type, AggregatedEnergy5min.timestamp).label("aggregated_time")

            # Construct the query to fetch aggregated energy data
            query = self.session.query(
                aggregation_column,  # Truncated timestamp based on the aggregation type
                func.sum(AggregatedEnergy5min.value).label("total_energy"),  # Sum up energy values within the aggregation period
                AggregatedEnergy5min.hiererchy_type_id,
                AggregatedEnergy5min.hiererchy_ref_id,
                AggregatedEnergy5min.location_id,
                AggregatedEnergy5min.site_id,
                AggregatedEnergy5min.plant_id,
                AggregatedEnergy5min.shop_id,
                AggregatedEnergy5min.line_id,
                AggregatedEnergy5min.equipment_id,
                AggregatedEnergy5min.area_id,
                AggregatedEnergy5min.station_id,
                AggregatedEnergy5min.shift,
                AggregatedEnergy5min.energy_type,
                AggregatedEnergy5min.subenergy,
                AggregatedEnergy5min.UOM,
                AggregatedEnergy5min.usage_type
            )\
            .filter(AggregatedEnergy5min.timestamp.between(start_timestamp, end_timestamp))\
            .group_by(  # Group data by aggregation time and other relevant fields
                aggregation_column,
                AggregatedEnergy5min.hiererchy_type_id,
                AggregatedEnergy5min.hiererchy_ref_id,
                AggregatedEnergy5min.location_id,
                AggregatedEnergy5min.site_id,
                AggregatedEnergy5min.plant_id,
                AggregatedEnergy5min.shop_id,
                AggregatedEnergy5min.line_id,
                AggregatedEnergy5min.equipment_id,
                AggregatedEnergy5min.area_id,
                AggregatedEnergy5min.station_id,
                AggregatedEnergy5min.shift,
                AggregatedEnergy5min.energy_type,
                AggregatedEnergy5min.subenergy,
                AggregatedEnergy5min.UOM,
                AggregatedEnergy5min.usage_type
            )\
            .order_by("aggregated_time")\
            .all()  # Execute the query and fetch all results

            return query  # Return the aggregated data
        
    def get_energy_consumption_aggregation(self, 
                                        start_year=None,
                                        end_year=None,
                                        start_month=None,
                                        end_month=None,
                                        plant_id=None,
                                        location_id=None,
                                        energy_type=None,
                                        aggregation_type=None,
                                        start_date=None,
                                        end_date=None,
                                        aggregation_level=None,
                                        shift=None):
        
        # List to store filter conditions for the query
        filters = []
        # List to store grouping levels based on aggregation level
        aggregation_level_arr = []
        # List to store aggregation fields like year, month, etc.
        aggregation_field = []

        # Filtering based on start and end year
        if start_year is not None and end_year is not None:
            filters.append(extract('year', AggregatedEnergy5min.timestamp) >= start_year)
            filters.append(extract('year', AggregatedEnergy5min.timestamp) <= end_year)
        elif start_year is not None:
            filters.append(extract('year', AggregatedEnergy5min.timestamp) == start_year)

        # Filtering based on start and end month
        if start_month is not None and end_month is not None:
            filters.append(extract('month', AggregatedEnergy5min.timestamp) >= start_month)
            filters.append(extract('month', AggregatedEnergy5min.timestamp) <= end_month)
        elif start_month is not None:
            filters.append(extract('month', AggregatedEnergy5min.timestamp) == start_month)

        # Filtering based on start and end date
        if start_date is not None and end_date is not None:
            filters.append(AggregatedEnergy5min.timestamp >= start_date)
            filters.append(AggregatedEnergy5min.timestamp <= end_date)

        # Defining aggregation level based on the input parameter
        if aggregation_level == LOCATION:
            aggregation_level_arr = [AggregatedEnergy5min.location_id]
        elif aggregation_level == PLANT:
            aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id]
        elif aggregation_level == SHOP:
            aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id]
        elif aggregation_level == LINE:
            aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id]
        elif aggregation_level == EQUIPMENT:
            aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id, AggregatedEnergy5min.equipment_id]

        # Adding shift to the aggregation level if provided
        if shift:
            aggregation_level_arr.append(AggregatedEnergy5min.shift)
            filters.append(AggregatedEnergy5min.shift == shift)

        # Filtering based on location ID
        if location_id:
            filters.append(AggregatedEnergy5min.location_id == location_id)
        
        # Filtering based on plant ID
        if plant_id:
            filters.append(AggregatedEnergy5min.plant_id == plant_id)

        # Filtering based on energy type
        if energy_type:
            filters.append(AggregatedEnergy5min.energy_type == energy_type)

        # Defining aggregation fields based on aggregation type
        if aggregation_type == AggregationType.Yearly.value:
            aggregation_field = [extract('year', AggregatedEnergy5min.timestamp).label('year')]
        elif aggregation_type == AggregationType.Monthly.value:
            aggregation_field = [
                extract('year', AggregatedEnergy5min.timestamp).label('year'),
                extract('month', AggregatedEnergy5min.timestamp).label('month')
            ]
        elif aggregation_type == AggregationType.Weekly.value:
            aggregation_field = [
                extract('year', AggregatedEnergy5min.timestamp).label('year'),
                extract('week', AggregatedEnergy5min.timestamp).label('week')
            ]
        elif aggregation_type == AggregationType.Daily.value:
            aggregation_field = [
                func.date(AggregatedEnergy5min.timestamp).label('date')
            ]

        # Executing the query with applied filters and groupings
        with self.session:
            query = self.session.query(
                AggregatedEnergy5min.site_id,
                *aggregation_level_arr,
                AggregatedEnergy5min.shift,
                AggregatedEnergy5min.energy_type,
                AggregatedEnergy5min.subenergy,
                AggregatedEnergy5min.UOM,
                func.sum(AggregatedEnergy5min.value).label("total_value"),
                func.sum(AggregatedEnergy5min.units_produced).label("total_units"),
                *aggregation_field
            ).filter(*filters)\
            .group_by(
                AggregatedEnergy5min.site_id,
                *aggregation_level_arr,
                AggregatedEnergy5min.shift,
                AggregatedEnergy5min.energy_type,
                AggregatedEnergy5min.subenergy,
                AggregatedEnergy5min.UOM,
                *aggregation_field
            )\
            .all()
            
            # Printing each query result
            for each in query:
                print(each)

            return query
    def add_data(self):
        with self.session:
            query = self.session.query(
                AggregatedEnergy5min.hiererchy_type_id,
                AggregatedEnergy5min.hiererchy_ref_id,
                AggregatedEnergy5min.location_id,
                AggregatedEnergy5min.site_id,
                AggregatedEnergy5min.plant_id,
                AggregatedEnergy5min.shop_id,
                AggregatedEnergy5min.line_id,
                AggregatedEnergy5min.equipment_id,
                AggregatedEnergy5min.area_id,
                AggregatedEnergy5min.station_id,
                AggregatedEnergy5min.shift,
                AggregatedEnergy5min.energy_type,
                AggregatedEnergy5min.subenergy,
                AggregatedEnergy5min.UOM,
                AggregatedEnergy5min.usage_type,
                AggregatedEnergy5min.timestamp,
                AggregatedEnergy5min.value,
                AggregatedEnergy5min.units_produced
            ).order_by(AggregatedEnergy5min.timestamp).all()

            arr = []

            for each in query:
                temp = each.timestamp - datetime.timedelta(days=366)
                arr.append(AggregatedEnergy5min(
                    hiererchy_type_id=each.hiererchy_type_id,
                    hiererchy_ref_id=each.hiererchy_ref_id,
                    location_id=each.location_id,
                    site_id=each.site_id,
                    plant_id=each.plant_id,
                    shop_id=each.shop_id,
                    line_id=each.line_id,
                    area_id=each.area_id,
                    station_id=each.station_id,
                    equipment_id=each.equipment_id,
                    shift=each.shift,
                    energy_type=each.energy_type,
                    subenergy=each.subenergy,
                    UOM=each.UOM,
                    value=each.value,  # Assign the total aggregated energy value
                    timestamp=temp,  # Timestamp when the data was aggregated
                    usage_type=each.usage_type,
                    units_produced = each.units_produced
                ))
    

            for each in query:
                temp = each.timestamp - datetime.timedelta(days=366+365)
                arr.append(AggregatedEnergy5min(
                    hiererchy_type_id=each.hiererchy_type_id,
                    hiererchy_ref_id=each.hiererchy_ref_id,
                    location_id=each.location_id,
                    site_id=each.site_id,
                    plant_id=each.plant_id,
                    shop_id=each.shop_id,
                    line_id=each.line_id,
                    area_id=each.area_id,
                    station_id=each.station_id,
                    equipment_id=each.equipment_id,
                    shift=each.shift,
                    energy_type=each.energy_type,
                    subenergy=each.subenergy,
                    UOM=each.UOM,
                    value=each.value,  # Assign the total aggregated energy value
                    timestamp=temp,  # Timestamp when the data was aggregated
                    usage_type=each.usage_type,
                    units_produced = each.units_produced
                ))
    
            for each in query:
                temp = each.timestamp - datetime.timedelta(days=366+365+365) 
                arr.append(AggregatedEnergy5min(
                    hiererchy_type_id=each.hiererchy_type_id,
                    hiererchy_ref_id=each.hiererchy_ref_id,
                    location_id=each.location_id,
                    site_id=each.site_id,
                    plant_id=each.plant_id,
                    shop_id=each.shop_id,
                    line_id=each.line_id,
                    area_id=each.area_id,
                    station_id=each.station_id,
                    equipment_id=each.equipment_id,
                    shift=each.shift,
                    energy_type=each.energy_type,
                    subenergy=each.subenergy,
                    UOM=each.UOM,
                    value=each.value,  # Assign the total aggregated energy value
                    timestamp=temp,  # Timestamp when the data was aggregated
                    usage_type=each.usage_type,
                    units_produced = each.units_produced
                ))


            for each in query:
                temp = each.timestamp - datetime.timedelta(days=366+365+365+365)
                arr.append(AggregatedEnergy5min(
                    hiererchy_type_id=each.hiererchy_type_id,
                    hiererchy_ref_id=each.hiererchy_ref_id,
                    location_id=each.location_id,
                    site_id=each.site_id,
                    plant_id=each.plant_id,
                    shop_id=each.shop_id,
                    line_id=each.line_id,
                    area_id=each.area_id,
                    station_id=each.station_id,
                    equipment_id=each.equipment_id,
                    shift=each.shift,
                    energy_type=each.energy_type,
                    subenergy=each.subenergy,
                    UOM=each.UOM,
                    value=each.value,  # Assign the total aggregated energy value
                    timestamp=temp,  # Timestamp when the data was aggregated
                    usage_type=each.usage_type,
                    units_produced = each.units_produced
                ))
    

            for each in query:
                temp = each.timestamp - datetime.timedelta(days=366+365+365+365+365)
                arr.append(AggregatedEnergy5min(
                    hiererchy_type_id=each.hiererchy_type_id,
                    hiererchy_ref_id=each.hiererchy_ref_id,
                    location_id=each.location_id,
                    site_id=each.site_id,
                    plant_id=each.plant_id,
                    shop_id=each.shop_id,
                    line_id=each.line_id,
                    area_id=each.area_id,
                    station_id=each.station_id,
                    equipment_id=each.equipment_id,
                    shift=each.shift,
                    energy_type=each.energy_type,
                    subenergy=each.subenergy,
                    UOM=each.UOM,
                    value=each.value,  # Assign the total aggregated energy value
                    timestamp=temp,  # Timestamp when the data was aggregated
                    usage_type=each.usage_type,
                    units_produced = each.units_produced
                ))
            
            self.bulk_insert(arr)
    


Editor is loading...
Leave a Comment