Untitled

 avatar
unknown
plain_text
17 days ago
20 kB
3
Indexable
from EM.repositories.repository import Repository
from EM.repositories.models.aggregation_energy_tracking import AggregationEnergyTracking
from EM.repositories.models.aggregated_energy_5min import AggregatedEnergy5min
from EM.repositories.models.location import Location
from EM.repositories.models.plant import Plant
from EM.repositories.models.equipment import Equipment
from EM.repositories.models.shop import Shop
from EM.repositories.models.line import Line

from EM.utilities.constants import AggregationType
from sqlalchemy import Date, Interval, and_, case, cast, desc, distinct, func, Integer, literal, not_, or_, select, \
    tuple_, nullslast
from EM.repositories.models.aggregated_energy_daily import AggregatedEnergydaily
from EM.utilities.constants import LOCATION,PLANT,SHOP,LINE,EQUIPMENT
from sqlalchemy.sql.expression import extract
import datetime
class EnergyMeasurementRepository(Repository):

    def __init__(self, session):
        self.session = session

    def get_latest_aggregation_timestamp(self,aggregation_type):
        with self.session:
            return self.session.query(
                AggregationEnergyTracking.id,
                AggregationEnergyTracking.timestamp,
                AggregationEnergyTracking.aggregation_type
            ).filter(AggregationEnergyTracking.aggregation_type == aggregation_type)\
            .order_by(desc(AggregationEnergyTracking.id))\
            .first()
        
    def get_aggregated_data(self, start_timestamp, end_timestamp, aggregation_type="hour"):
        """Fetches aggregated energy measurement data within a given time range.
        
        Args:
            start_timestamp (datetime): The starting timestamp for the aggregation period.
            end_timestamp (datetime): The ending timestamp for the aggregation period.
            aggregation_type (str, optional): The level of aggregation, either 'hour' or 'day'. Defaults to "hour".
        
        Returns:
            list: A list of aggregated energy records grouped by the specified time interval.
        """
        
        with self.session:  # Ensure the session is properly managed
            
            # Define the aggregation column based on the given aggregation type
            aggregation_column = func.date_trunc(aggregation_type, AggregatedEnergy5min.timestamp).label("aggregated_time")

            # Construct the query to fetch aggregated energy data
            query = self.session.query(
                aggregation_column,  # Truncated timestamp based on the aggregation type
                func.sum(AggregatedEnergy5min.value).label("total_energy"),  # Sum up energy values within the aggregation period
                AggregatedEnergy5min.hiererchy_type_id,
                AggregatedEnergy5min.hiererchy_ref_id,
                AggregatedEnergy5min.location_id,
                AggregatedEnergy5min.site_id,
                AggregatedEnergy5min.plant_id,
                AggregatedEnergy5min.shop_id,
                AggregatedEnergy5min.line_id,
                AggregatedEnergy5min.equipment_id,
                AggregatedEnergy5min.area_id,
                AggregatedEnergy5min.station_id,
                AggregatedEnergy5min.shift,
                AggregatedEnergy5min.energy_type,
                AggregatedEnergy5min.subenergy,
                AggregatedEnergy5min.UOM,
                AggregatedEnergy5min.usage_type
            )\
            .filter(AggregatedEnergy5min.timestamp.between(start_timestamp, end_timestamp))\
            .group_by(  # Group data by aggregation time and other relevant fields
                aggregation_column,
                AggregatedEnergy5min.hiererchy_type_id,
                AggregatedEnergy5min.hiererchy_ref_id,
                AggregatedEnergy5min.location_id,
                AggregatedEnergy5min.site_id,
                AggregatedEnergy5min.plant_id,
                AggregatedEnergy5min.shop_id,
                AggregatedEnergy5min.line_id,
                AggregatedEnergy5min.equipment_id,
                AggregatedEnergy5min.area_id,
                AggregatedEnergy5min.station_id,
                AggregatedEnergy5min.shift,
                AggregatedEnergy5min.energy_type,
                AggregatedEnergy5min.subenergy,
                AggregatedEnergy5min.UOM,
                AggregatedEnergy5min.usage_type
            )\
            .order_by("aggregated_time")\
            .all()  # Execute the query and fetch all results

            return query  # Return the aggregated data
        
    def get_energy_consumption_aggregation(self, 
                                        start_year=None,
                                        end_year=None,
                                        start_month=None,
                                        end_month=None,
                                        plant_id=None,
                                        location_id=None,
                                        energy_type=None,
                                        aggregation_type=None,
                                        start_date=None,
                                        end_date=None,
                                        aggregation_level=None,
                                        shift=None):
        
        filters = []  # List to store filter conditions
        aggregation_level_arr = []  # Stores grouping fields
        aggregation_field = []  # Stores fields used for aggregation
        join_statements = []  # Stores dynamic joins for different levels
        select_fields = [AggregatedEnergy5min.site_id]  # Base selection fields

        # Apply filters based on the given year range
        if start_year is not None and end_year is not None:
            filters.append(extract('year', AggregatedEnergy5min.timestamp) >= start_year)
            filters.append(extract('year', AggregatedEnergy5min.timestamp) <= end_year)

        elif start_year is not None:
            filters.append(extract('year', AggregatedEnergy5min.timestamp) == start_year)

        # Apply filters based on the given month range
        if start_month is not None and end_month is not None:
            filters.append(extract('month', AggregatedEnergy5min.timestamp) >= start_month)
            filters.append(extract('month', AggregatedEnergy5min.timestamp) <= end_month)

        elif start_month is not None:
            filters.append(extract('month', AggregatedEnergy5min.timestamp) == start_month)

        # Apply filters based on the given date range
        if start_date is not None and end_date is not None:
            filters.append(AggregatedEnergy5min.timestamp >= start_date)
            filters.append(AggregatedEnergy5min.timestamp <= end_date)

        # Define aggregation levels, add necessary joins, and extract names
        if aggregation_level == LOCATION:
            aggregation_level_arr.append(AggregatedEnergy5min.location_id)
            join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id))
            select_fields.append(Location.name.label("location_name"))
        
        elif aggregation_level == PLANT:
            aggregation_level_arr.extend([AggregatedEnergy5min.location_id, 
                                        AggregatedEnergy5min.plant_id])
            join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id))
            select_fields.append(Location.name.label("location_name"))
        
        elif aggregation_level == SHOP:
            aggregation_level_arr.extend([AggregatedEnergy5min.location_id, 
                                        AggregatedEnergy5min.plant_id, 
                                        AggregatedEnergy5min.shop_id])
            
            join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id))
            join_statements.append((Shop, AggregatedEnergy5min.shop_id == Shop.id))
            
            select_fields.extend([Location.name.label("location_name"), 
                                Shop.name.label("shop_name")])

        elif aggregation_level == LINE:
            aggregation_level_arr.extend([AggregatedEnergy5min.location_id, 
                                        AggregatedEnergy5min.plant_id,
                                        AggregatedEnergy5min.shop_id, 
                                        AggregatedEnergy5min.line_id])
            join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id))
            join_statements.append((Shop, AggregatedEnergy5min.shop_id == Shop.id))
            join_statements.append((Line, AggregatedEnergy5min.line_id == Line.id))
            select_fields.extend([Location.name.label("location_name"), 
                                Shop.name.label("shop_name"),
                                Line.name.label("line_name")])

        elif aggregation_level == EQUIPMENT:
            aggregation_level_arr.extend([AggregatedEnergy5min.location_id, 
                                        AggregatedEnergy5min.plant_id, 
                                        AggregatedEnergy5min.shop_id, 
                                        AggregatedEnergy5min.line_id, 
                                        AggregatedEnergy5min.equipment_id])
            join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id))
            join_statements.append((Shop, AggregatedEnergy5min.shop_id == Shop.id))
            join_statements.append((Line, AggregatedEnergy5min.line_id == Line.id))
            join_statements.append((Equipment, AggregatedEnergy5min.equipment_id == Equipment.id))
            select_fields.extend([
                Location.name.label("location_name"),
                Shop.name.label("shop_name"),
                Line.name.label("line_name"),
                Equipment.name.label("equipment_name")
            ])

        # Apply shift filtering if provided
        if shift:
            aggregation_level_arr.append(AggregatedEnergy5min.shift)
            filters.append(AggregatedEnergy5min.shift == shift)

        # Apply additional filters for location, plant, and energy type
        if location_id:
            filters.append(AggregatedEnergy5min.location_id == location_id)
        if plant_id:
            filters.append(AggregatedEnergy5min.plant_id == plant_id)
        if energy_type:
            filters.append(AggregatedEnergy5min.energy_type == energy_type)

        # Define aggregation fields based on the selected aggregation type
        if aggregation_type == AggregationType.Yearly.value:
            aggregation_field = [extract('year', AggregatedEnergy5min.timestamp).label('year')]
        elif aggregation_type == AggregationType.Monthly.value:
            aggregation_field = [
                extract('year', AggregatedEnergy5min.timestamp).label('year'),
                extract('month', AggregatedEnergy5min.timestamp).label('month')
            ]
        elif aggregation_type == AggregationType.Weekly.value:
            aggregation_field = [
                extract('year', AggregatedEnergy5min.timestamp).label('year'),
                extract('week', AggregatedEnergy5min.timestamp).label('week')
            ]
        elif aggregation_type == AggregationType.Daily.value:
            aggregation_field = [
                func.date(AggregatedEnergy5min.timestamp).label('date')
            ]

        # Add aggregation fields and energy-related fields to the selection
        select_fields.extend([
            *aggregation_level_arr,
            AggregatedEnergy5min.shift,
            AggregatedEnergy5min.energy_type,
            AggregatedEnergy5min.subenergy,
            AggregatedEnergy5min.UOM,
            func.sum(AggregatedEnergy5min.value).label("total_value"),
            func.sum(AggregatedEnergy5min.units_produced).label("total_units"),
            *aggregation_field
        ])

        # Dynamically construct order_by based on available fields
        order_by_fields = [
            extract('year', AggregatedEnergy5min.timestamp),
            extract('month', AggregatedEnergy5min.timestamp),
            func.date(AggregatedEnergy5min.timestamp)
        ]
        order_by_fields.extend(aggregation_level_arr)

        with self.session:
            # Start query
            query = self.session.query(*select_fields)

            # Apply dynamic joins
            for table, condition in join_statements:
                query = query.join(table, condition, isouter=True)

            # Apply filters, grouping, and ordering
            query = query.filter(*filters)\
                .group_by(*select_fields)\
                .order_by(*order_by_fields)\
                .all()
            
            for each in query:
                print(each)  # Debugging statement to print query results

            return query
            
    def add_data(self):
        with self.session:
            query = self.session.query(
                AggregatedEnergy5min.hiererchy_type_id,
                AggregatedEnergy5min.hiererchy_ref_id,
                AggregatedEnergy5min.location_id,
                AggregatedEnergy5min.site_id,
                AggregatedEnergy5min.plant_id,
                AggregatedEnergy5min.shop_id,
                AggregatedEnergy5min.line_id,
                AggregatedEnergy5min.equipment_id,
                AggregatedEnergy5min.area_id,
                AggregatedEnergy5min.station_id,
                AggregatedEnergy5min.shift,
                AggregatedEnergy5min.energy_type,
                AggregatedEnergy5min.subenergy,
                AggregatedEnergy5min.UOM,
                AggregatedEnergy5min.usage_type,
                AggregatedEnergy5min.timestamp,
                AggregatedEnergy5min.value,
                AggregatedEnergy5min.units_produced
            ).order_by(AggregatedEnergy5min.timestamp).all()

            arr = []

            for each in query:
                temp = each.timestamp - datetime.timedelta(days=366)
                arr.append(AggregatedEnergy5min(
                    hiererchy_type_id=each.hiererchy_type_id,
                    hiererchy_ref_id=each.hiererchy_ref_id,
                    location_id=each.location_id,
                    site_id=each.site_id,
                    plant_id=each.plant_id,
                    shop_id=each.shop_id,
                    line_id=each.line_id,
                    area_id=each.area_id,
                    station_id=each.station_id,
                    equipment_id=each.equipment_id,
                    shift=each.shift,
                    energy_type=each.energy_type,
                    subenergy=each.subenergy,
                    UOM=each.UOM,
                    value=each.value,  # Assign the total aggregated energy value
                    timestamp=temp,  # Timestamp when the data was aggregated
                    usage_type=each.usage_type,
                    units_produced = each.units_produced
                ))
    

            for each in query:
                temp = each.timestamp - datetime.timedelta(days=366+365)
                arr.append(AggregatedEnergy5min(
                    hiererchy_type_id=each.hiererchy_type_id,
                    hiererchy_ref_id=each.hiererchy_ref_id,
                    location_id=each.location_id,
                    site_id=each.site_id,
                    plant_id=each.plant_id,
                    shop_id=each.shop_id,
                    line_id=each.line_id,
                    area_id=each.area_id,
                    station_id=each.station_id,
                    equipment_id=each.equipment_id,
                    shift=each.shift,
                    energy_type=each.energy_type,
                    subenergy=each.subenergy,
                    UOM=each.UOM,
                    value=each.value,  # Assign the total aggregated energy value
                    timestamp=temp,  # Timestamp when the data was aggregated
                    usage_type=each.usage_type,
                    units_produced = each.units_produced
                ))
    
            for each in query:
                temp = each.timestamp - datetime.timedelta(days=366+365+365) 
                arr.append(AggregatedEnergy5min(
                    hiererchy_type_id=each.hiererchy_type_id,
                    hiererchy_ref_id=each.hiererchy_ref_id,
                    location_id=each.location_id,
                    site_id=each.site_id,
                    plant_id=each.plant_id,
                    shop_id=each.shop_id,
                    line_id=each.line_id,
                    area_id=each.area_id,
                    station_id=each.station_id,
                    equipment_id=each.equipment_id,
                    shift=each.shift,
                    energy_type=each.energy_type,
                    subenergy=each.subenergy,
                    UOM=each.UOM,
                    value=each.value,  # Assign the total aggregated energy value
                    timestamp=temp,  # Timestamp when the data was aggregated
                    usage_type=each.usage_type,
                    units_produced = each.units_produced
                ))


            for each in query:
                temp = each.timestamp - datetime.timedelta(days=366+365+365+365)
                arr.append(AggregatedEnergy5min(
                    hiererchy_type_id=each.hiererchy_type_id,
                    hiererchy_ref_id=each.hiererchy_ref_id,
                    location_id=each.location_id,
                    site_id=each.site_id,
                    plant_id=each.plant_id,
                    shop_id=each.shop_id,
                    line_id=each.line_id,
                    area_id=each.area_id,
                    station_id=each.station_id,
                    equipment_id=each.equipment_id,
                    shift=each.shift,
                    energy_type=each.energy_type,
                    subenergy=each.subenergy,
                    UOM=each.UOM,
                    value=each.value,  # Assign the total aggregated energy value
                    timestamp=temp,  # Timestamp when the data was aggregated
                    usage_type=each.usage_type,
                    units_produced = each.units_produced
                ))
    

            for each in query:
                temp = each.timestamp - datetime.timedelta(days=366+365+365+365+365)
                arr.append(AggregatedEnergy5min(
                    hiererchy_type_id=each.hiererchy_type_id,
                    hiererchy_ref_id=each.hiererchy_ref_id,
                    location_id=each.location_id,
                    site_id=each.site_id,
                    plant_id=each.plant_id,
                    shop_id=each.shop_id,
                    line_id=each.line_id,
                    area_id=each.area_id,
                    station_id=each.station_id,
                    equipment_id=each.equipment_id,
                    shift=each.shift,
                    energy_type=each.energy_type,
                    subenergy=each.subenergy,
                    UOM=each.UOM,
                    value=each.value,  # Assign the total aggregated energy value
                    timestamp=temp,  # Timestamp when the data was aggregated
                    usage_type=each.usage_type,
                    units_produced = each.units_produced
                ))
            
            self.bulk_insert(arr)
    


Editor is loading...
Leave a Comment