Untitled
unknown
plain_text
10 months ago
17 kB
16
Indexable
from EM.repositories.repository import Repository
from EM.repositories.models.aggregation_energy_tracking import AggregationEnergyTracking
from EM.repositories.models.aggregated_energy_5min import AggregatedEnergy5min
from EM.utilities.constants import AggregationType
from sqlalchemy import Date, Interval, and_, case, cast, desc, distinct, func, Integer, literal, not_, or_, select, \
tuple_, nullslast
from EM.repositories.models.aggregated_energy_daily import AggregatedEnergydaily
from EM.utilities.constants import LOCATION,PLANT,SHOP,LINE,EQUIPMENT
from sqlalchemy.sql.expression import extract
import datetime
class EnergyMeasurementRepository(Repository):
def __init__(self, session):
self.session = session
def get_latest_aggregation_timestamp(self,aggregation_type):
with self.session:
return self.session.query(
AggregationEnergyTracking.id,
AggregationEnergyTracking.timestamp,
AggregationEnergyTracking.aggregation_type
).filter(AggregationEnergyTracking.aggregation_type == aggregation_type)\
.order_by(desc(AggregationEnergyTracking.id))\
.first()
def get_aggregated_data(self, start_timestamp, end_timestamp, aggregation_type="hour"):
"""Fetches aggregated energy measurement data within a given time range.
Args:
start_timestamp (datetime): The starting timestamp for the aggregation period.
end_timestamp (datetime): The ending timestamp for the aggregation period.
aggregation_type (str, optional): The level of aggregation, either 'hour' or 'day'. Defaults to "hour".
Returns:
list: A list of aggregated energy records grouped by the specified time interval.
"""
with self.session: # Ensure the session is properly managed
# Define the aggregation column based on the given aggregation type
aggregation_column = func.date_trunc(aggregation_type, AggregatedEnergy5min.timestamp).label("aggregated_time")
# Construct the query to fetch aggregated energy data
query = self.session.query(
aggregation_column, # Truncated timestamp based on the aggregation type
func.sum(AggregatedEnergy5min.value).label("total_energy"), # Sum up energy values within the aggregation period
AggregatedEnergy5min.hiererchy_type_id,
AggregatedEnergy5min.hiererchy_ref_id,
AggregatedEnergy5min.location_id,
AggregatedEnergy5min.site_id,
AggregatedEnergy5min.plant_id,
AggregatedEnergy5min.shop_id,
AggregatedEnergy5min.line_id,
AggregatedEnergy5min.equipment_id,
AggregatedEnergy5min.area_id,
AggregatedEnergy5min.station_id,
AggregatedEnergy5min.shift,
AggregatedEnergy5min.energy_type,
AggregatedEnergy5min.subenergy,
AggregatedEnergy5min.UOM,
AggregatedEnergy5min.usage_type
)\
.filter(AggregatedEnergy5min.timestamp.between(start_timestamp, end_timestamp))\
.group_by( # Group data by aggregation time and other relevant fields
aggregation_column,
AggregatedEnergy5min.hiererchy_type_id,
AggregatedEnergy5min.hiererchy_ref_id,
AggregatedEnergy5min.location_id,
AggregatedEnergy5min.site_id,
AggregatedEnergy5min.plant_id,
AggregatedEnergy5min.shop_id,
AggregatedEnergy5min.line_id,
AggregatedEnergy5min.equipment_id,
AggregatedEnergy5min.area_id,
AggregatedEnergy5min.station_id,
AggregatedEnergy5min.shift,
AggregatedEnergy5min.energy_type,
AggregatedEnergy5min.subenergy,
AggregatedEnergy5min.UOM,
AggregatedEnergy5min.usage_type
)\
.order_by("aggregated_time")\
.all() # Execute the query and fetch all results
return query # Return the aggregated data
def get_energy_consumption_aggregation(self,
start_year=None,
end_year=None,
start_month=None,
end_month=None,
plant_id=None,
location_id=None,
energy_type=None,
aggregation_type=None,
start_date=None,
end_date=None,
aggregation_level=None,
shift=None):
# List to store filter conditions for the query
filters = []
# List to store grouping levels based on aggregation level
aggregation_level_arr = []
# List to store aggregation fields like year, month, etc.
aggregation_field = []
# Filtering based on start and end year
if start_year is not None and end_year is not None:
filters.append(extract('year', AggregatedEnergy5min.timestamp) >= start_year)
filters.append(extract('year', AggregatedEnergy5min.timestamp) <= end_year)
elif start_year is not None:
filters.append(extract('year', AggregatedEnergy5min.timestamp) == start_year)
# Filtering based on start and end month
if start_month is not None and end_month is not None:
filters.append(extract('month', AggregatedEnergy5min.timestamp) >= start_month)
filters.append(extract('month', AggregatedEnergy5min.timestamp) <= end_month)
elif start_month is not None:
filters.append(extract('month', AggregatedEnergy5min.timestamp) == start_month)
# Filtering based on start and end date
if start_date is not None and end_date is not None:
filters.append(AggregatedEnergy5min.timestamp >= start_date)
filters.append(AggregatedEnergy5min.timestamp <= end_date)
# Defining aggregation level based on the input parameter
if aggregation_level == LOCATION:
aggregation_level_arr = [AggregatedEnergy5min.location_id]
elif aggregation_level == PLANT:
aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id]
elif aggregation_level == SHOP:
aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id]
elif aggregation_level == LINE:
aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id]
elif aggregation_level == EQUIPMENT:
aggregation_level_arr = [AggregatedEnergy5min.location_id, AggregatedEnergy5min.plant_id, AggregatedEnergy5min.shop_id, AggregatedEnergy5min.line_id, AggregatedEnergy5min.equipment_id]
# Adding shift to the aggregation level if provided
if shift:
aggregation_level_arr.append(AggregatedEnergy5min.shift)
filters.append(AggregatedEnergy5min.shift == shift)
# Filtering based on location ID
if location_id:
filters.append(AggregatedEnergy5min.location_id == location_id)
# Filtering based on plant ID
if plant_id:
filters.append(AggregatedEnergy5min.plant_id == plant_id)
# Filtering based on energy type
if energy_type:
filters.append(AggregatedEnergy5min.energy_type == energy_type)
# Defining aggregation fields based on aggregation type
if aggregation_type == AggregationType.Yearly.value:
aggregation_field = [extract('year', AggregatedEnergy5min.timestamp).label('year')]
elif aggregation_type == AggregationType.Monthly.value:
aggregation_field = [
extract('year', AggregatedEnergy5min.timestamp).label('year'),
extract('month', AggregatedEnergy5min.timestamp).label('month')
]
elif aggregation_type == AggregationType.Weekly.value:
aggregation_field = [
extract('year', AggregatedEnergy5min.timestamp).label('year'),
extract('week', AggregatedEnergy5min.timestamp).label('week')
]
elif aggregation_type == AggregationType.Daily.value:
aggregation_field = [
func.date(AggregatedEnergy5min.timestamp).label('date')
]
# Executing the query with applied filters and groupings
with self.session:
query = self.session.query(
AggregatedEnergy5min.site_id,
*aggregation_level_arr,
AggregatedEnergy5min.shift,
AggregatedEnergy5min.energy_type,
AggregatedEnergy5min.subenergy,
AggregatedEnergy5min.UOM,
func.sum(AggregatedEnergy5min.value).label("total_value"),
func.sum(AggregatedEnergy5min.units_produced).label("total_units"),
*aggregation_field
).filter(*filters)\
.group_by(
AggregatedEnergy5min.site_id,
*aggregation_level_arr,
AggregatedEnergy5min.shift,
AggregatedEnergy5min.energy_type,
AggregatedEnergy5min.subenergy,
AggregatedEnergy5min.UOM,
*aggregation_field
)\
.all()
# Printing each query result
for each in query:
print(each)
return query
def add_data(self):
with self.session:
query = self.session.query(
AggregatedEnergy5min.hiererchy_type_id,
AggregatedEnergy5min.hiererchy_ref_id,
AggregatedEnergy5min.location_id,
AggregatedEnergy5min.site_id,
AggregatedEnergy5min.plant_id,
AggregatedEnergy5min.shop_id,
AggregatedEnergy5min.line_id,
AggregatedEnergy5min.equipment_id,
AggregatedEnergy5min.area_id,
AggregatedEnergy5min.station_id,
AggregatedEnergy5min.shift,
AggregatedEnergy5min.energy_type,
AggregatedEnergy5min.subenergy,
AggregatedEnergy5min.UOM,
AggregatedEnergy5min.usage_type,
AggregatedEnergy5min.timestamp,
AggregatedEnergy5min.value,
AggregatedEnergy5min.units_produced
).order_by(AggregatedEnergy5min.timestamp).all()
arr = []
for each in query:
temp = each.timestamp - datetime.timedelta(days=366)
arr.append(AggregatedEnergy5min(
hiererchy_type_id=each.hiererchy_type_id,
hiererchy_ref_id=each.hiererchy_ref_id,
location_id=each.location_id,
site_id=each.site_id,
plant_id=each.plant_id,
shop_id=each.shop_id,
line_id=each.line_id,
area_id=each.area_id,
station_id=each.station_id,
equipment_id=each.equipment_id,
shift=each.shift,
energy_type=each.energy_type,
subenergy=each.subenergy,
UOM=each.UOM,
value=each.value, # Assign the total aggregated energy value
timestamp=temp, # Timestamp when the data was aggregated
usage_type=each.usage_type,
units_produced = each.units_produced
))
for each in query:
temp = each.timestamp - datetime.timedelta(days=366+365)
arr.append(AggregatedEnergy5min(
hiererchy_type_id=each.hiererchy_type_id,
hiererchy_ref_id=each.hiererchy_ref_id,
location_id=each.location_id,
site_id=each.site_id,
plant_id=each.plant_id,
shop_id=each.shop_id,
line_id=each.line_id,
area_id=each.area_id,
station_id=each.station_id,
equipment_id=each.equipment_id,
shift=each.shift,
energy_type=each.energy_type,
subenergy=each.subenergy,
UOM=each.UOM,
value=each.value, # Assign the total aggregated energy value
timestamp=temp, # Timestamp when the data was aggregated
usage_type=each.usage_type,
units_produced = each.units_produced
))
for each in query:
temp = each.timestamp - datetime.timedelta(days=366+365+365)
arr.append(AggregatedEnergy5min(
hiererchy_type_id=each.hiererchy_type_id,
hiererchy_ref_id=each.hiererchy_ref_id,
location_id=each.location_id,
site_id=each.site_id,
plant_id=each.plant_id,
shop_id=each.shop_id,
line_id=each.line_id,
area_id=each.area_id,
station_id=each.station_id,
equipment_id=each.equipment_id,
shift=each.shift,
energy_type=each.energy_type,
subenergy=each.subenergy,
UOM=each.UOM,
value=each.value, # Assign the total aggregated energy value
timestamp=temp, # Timestamp when the data was aggregated
usage_type=each.usage_type,
units_produced = each.units_produced
))
for each in query:
temp = each.timestamp - datetime.timedelta(days=366+365+365+365)
arr.append(AggregatedEnergy5min(
hiererchy_type_id=each.hiererchy_type_id,
hiererchy_ref_id=each.hiererchy_ref_id,
location_id=each.location_id,
site_id=each.site_id,
plant_id=each.plant_id,
shop_id=each.shop_id,
line_id=each.line_id,
area_id=each.area_id,
station_id=each.station_id,
equipment_id=each.equipment_id,
shift=each.shift,
energy_type=each.energy_type,
subenergy=each.subenergy,
UOM=each.UOM,
value=each.value, # Assign the total aggregated energy value
timestamp=temp, # Timestamp when the data was aggregated
usage_type=each.usage_type,
units_produced = each.units_produced
))
for each in query:
temp = each.timestamp - datetime.timedelta(days=366+365+365+365+365)
arr.append(AggregatedEnergy5min(
hiererchy_type_id=each.hiererchy_type_id,
hiererchy_ref_id=each.hiererchy_ref_id,
location_id=each.location_id,
site_id=each.site_id,
plant_id=each.plant_id,
shop_id=each.shop_id,
line_id=each.line_id,
area_id=each.area_id,
station_id=each.station_id,
equipment_id=each.equipment_id,
shift=each.shift,
energy_type=each.energy_type,
subenergy=each.subenergy,
UOM=each.UOM,
value=each.value, # Assign the total aggregated energy value
timestamp=temp, # Timestamp when the data was aggregated
usage_type=each.usage_type,
units_produced = each.units_produced
))
self.bulk_insert(arr)
Editor is loading...
Leave a Comment