Untitled
unknown
plain_text
8 months ago
20 kB
4
Indexable
from EM.repositories.repository import Repository
from EM.repositories.models.aggregation_energy_tracking import AggregationEnergyTracking
from EM.repositories.models.aggregated_energy_5min import AggregatedEnergy5min
from EM.repositories.models.location import Location
from EM.repositories.models.plant import Plant
from EM.repositories.models.equipment import Equipment
from EM.repositories.models.shop import Shop
from EM.repositories.models.line import Line
from EM.utilities.constants import AggregationType
from sqlalchemy import Date, Interval, and_, case, cast, desc, distinct, func, Integer, literal, not_, or_, select, \
tuple_, nullslast
from EM.repositories.models.aggregated_energy_daily import AggregatedEnergydaily
from EM.utilities.constants import LOCATION,PLANT,SHOP,LINE,EQUIPMENT
from sqlalchemy.sql.expression import extract
import datetime
class EnergyMeasurementRepository(Repository):
def __init__(self, session):
self.session = session
def get_latest_aggregation_timestamp(self,aggregation_type):
with self.session:
return self.session.query(
AggregationEnergyTracking.id,
AggregationEnergyTracking.timestamp,
AggregationEnergyTracking.aggregation_type
).filter(AggregationEnergyTracking.aggregation_type == aggregation_type)\
.order_by(desc(AggregationEnergyTracking.id))\
.first()
def get_aggregated_data(self, start_timestamp, end_timestamp, aggregation_type="hour"):
"""Fetches aggregated energy measurement data within a given time range.
Args:
start_timestamp (datetime): The starting timestamp for the aggregation period.
end_timestamp (datetime): The ending timestamp for the aggregation period.
aggregation_type (str, optional): The level of aggregation, either 'hour' or 'day'. Defaults to "hour".
Returns:
list: A list of aggregated energy records grouped by the specified time interval.
"""
with self.session: # Ensure the session is properly managed
# Define the aggregation column based on the given aggregation type
aggregation_column = func.date_trunc(aggregation_type, AggregatedEnergy5min.timestamp).label("aggregated_time")
# Construct the query to fetch aggregated energy data
query = self.session.query(
aggregation_column, # Truncated timestamp based on the aggregation type
func.sum(AggregatedEnergy5min.value).label("total_energy"), # Sum up energy values within the aggregation period
AggregatedEnergy5min.hiererchy_type_id,
AggregatedEnergy5min.hiererchy_ref_id,
AggregatedEnergy5min.location_id,
AggregatedEnergy5min.site_id,
AggregatedEnergy5min.plant_id,
AggregatedEnergy5min.shop_id,
AggregatedEnergy5min.line_id,
AggregatedEnergy5min.equipment_id,
AggregatedEnergy5min.area_id,
AggregatedEnergy5min.station_id,
AggregatedEnergy5min.shift,
AggregatedEnergy5min.energy_type,
AggregatedEnergy5min.subenergy,
AggregatedEnergy5min.UOM,
AggregatedEnergy5min.usage_type
)\
.filter(AggregatedEnergy5min.timestamp.between(start_timestamp, end_timestamp))\
.group_by( # Group data by aggregation time and other relevant fields
aggregation_column,
AggregatedEnergy5min.hiererchy_type_id,
AggregatedEnergy5min.hiererchy_ref_id,
AggregatedEnergy5min.location_id,
AggregatedEnergy5min.site_id,
AggregatedEnergy5min.plant_id,
AggregatedEnergy5min.shop_id,
AggregatedEnergy5min.line_id,
AggregatedEnergy5min.equipment_id,
AggregatedEnergy5min.area_id,
AggregatedEnergy5min.station_id,
AggregatedEnergy5min.shift,
AggregatedEnergy5min.energy_type,
AggregatedEnergy5min.subenergy,
AggregatedEnergy5min.UOM,
AggregatedEnergy5min.usage_type
)\
.order_by("aggregated_time")\
.all() # Execute the query and fetch all results
return query # Return the aggregated data
def get_energy_consumption_aggregation(self,
start_year=None,
end_year=None,
start_month=None,
end_month=None,
plant_id=None,
location_id=None,
energy_type=None,
aggregation_type=None,
start_date=None,
end_date=None,
aggregation_level=None,
shift=None):
filters = [] # List to store filter conditions
aggregation_level_arr = [] # Stores grouping fields
aggregation_field = [] # Stores fields used for aggregation
join_statements = [] # Stores dynamic joins for different levels
select_fields = [AggregatedEnergy5min.site_id] # Base selection fields
# Apply filters based on the given year range
if start_year is not None and end_year is not None:
filters.append(extract('year', AggregatedEnergy5min.timestamp) >= start_year)
filters.append(extract('year', AggregatedEnergy5min.timestamp) <= end_year)
elif start_year is not None:
filters.append(extract('year', AggregatedEnergy5min.timestamp) == start_year)
# Apply filters based on the given month range
if start_month is not None and end_month is not None:
filters.append(extract('month', AggregatedEnergy5min.timestamp) >= start_month)
filters.append(extract('month', AggregatedEnergy5min.timestamp) <= end_month)
elif start_month is not None:
filters.append(extract('month', AggregatedEnergy5min.timestamp) == start_month)
# Apply filters based on the given date range
if start_date is not None and end_date is not None:
filters.append(AggregatedEnergy5min.timestamp >= start_date)
filters.append(AggregatedEnergy5min.timestamp <= end_date)
# Define aggregation levels, add necessary joins, and extract names
if aggregation_level == LOCATION:
aggregation_level_arr.append(AggregatedEnergy5min.location_id)
join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id))
select_fields.append(Location.name.label("location_name"))
elif aggregation_level == PLANT:
aggregation_level_arr.extend([AggregatedEnergy5min.location_id,
AggregatedEnergy5min.plant_id])
join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id))
select_fields.append(Location.name.label("location_name"))
elif aggregation_level == SHOP:
aggregation_level_arr.extend([AggregatedEnergy5min.location_id,
AggregatedEnergy5min.plant_id,
AggregatedEnergy5min.shop_id])
join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id))
join_statements.append((Shop, AggregatedEnergy5min.shop_id == Shop.id))
select_fields.extend([Location.name.label("location_name"),
Shop.name.label("shop_name")])
elif aggregation_level == LINE:
aggregation_level_arr.extend([AggregatedEnergy5min.location_id,
AggregatedEnergy5min.plant_id,
AggregatedEnergy5min.shop_id,
AggregatedEnergy5min.line_id])
join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id))
join_statements.append((Shop, AggregatedEnergy5min.shop_id == Shop.id))
join_statements.append((Line, AggregatedEnergy5min.line_id == Line.id))
select_fields.extend([Location.name.label("location_name"),
Shop.name.label("shop_name"),
Line.name.label("line_name")])
elif aggregation_level == EQUIPMENT:
aggregation_level_arr.extend([AggregatedEnergy5min.location_id,
AggregatedEnergy5min.plant_id,
AggregatedEnergy5min.shop_id,
AggregatedEnergy5min.line_id,
AggregatedEnergy5min.equipment_id])
join_statements.append((Location, AggregatedEnergy5min.location_id == Location.id))
join_statements.append((Shop, AggregatedEnergy5min.shop_id == Shop.id))
join_statements.append((Line, AggregatedEnergy5min.line_id == Line.id))
join_statements.append((Equipment, AggregatedEnergy5min.equipment_id == Equipment.id))
select_fields.extend([
Location.name.label("location_name"),
Shop.name.label("shop_name"),
Line.name.label("line_name"),
Equipment.name.label("equipment_name")
])
# Apply shift filtering if provided
if shift:
aggregation_level_arr.append(AggregatedEnergy5min.shift)
filters.append(AggregatedEnergy5min.shift == shift)
# Apply additional filters for location, plant, and energy type
if location_id:
filters.append(AggregatedEnergy5min.location_id == location_id)
if plant_id:
filters.append(AggregatedEnergy5min.plant_id == plant_id)
if energy_type:
filters.append(AggregatedEnergy5min.energy_type == energy_type)
# Define aggregation fields based on the selected aggregation type
if aggregation_type == AggregationType.Yearly.value:
aggregation_field = [extract('year', AggregatedEnergy5min.timestamp).label('year')]
elif aggregation_type == AggregationType.Monthly.value:
aggregation_field = [
extract('year', AggregatedEnergy5min.timestamp).label('year'),
extract('month', AggregatedEnergy5min.timestamp).label('month')
]
elif aggregation_type == AggregationType.Weekly.value:
aggregation_field = [
extract('year', AggregatedEnergy5min.timestamp).label('year'),
extract('week', AggregatedEnergy5min.timestamp).label('week')
]
elif aggregation_type == AggregationType.Daily.value:
aggregation_field = [
func.date(AggregatedEnergy5min.timestamp).label('date')
]
# Add aggregation fields and energy-related fields to the selection
select_fields.extend([
*aggregation_level_arr,
AggregatedEnergy5min.shift,
AggregatedEnergy5min.energy_type,
AggregatedEnergy5min.subenergy,
AggregatedEnergy5min.UOM,
func.sum(AggregatedEnergy5min.value).label("total_value"),
func.sum(AggregatedEnergy5min.units_produced).label("total_units"),
*aggregation_field
])
# Dynamically construct order_by based on available fields
order_by_fields = [
extract('year', AggregatedEnergy5min.timestamp),
extract('month', AggregatedEnergy5min.timestamp),
func.date(AggregatedEnergy5min.timestamp)
]
order_by_fields.extend(aggregation_level_arr)
with self.session:
# Start query
query = self.session.query(*select_fields)
# Apply dynamic joins
for table, condition in join_statements:
query = query.join(table, condition, isouter=True)
# Apply filters, grouping, and ordering
query = query.filter(*filters)\
.group_by(*select_fields)\
.order_by(*order_by_fields)\
.all()
for each in query:
print(each) # Debugging statement to print query results
return query
def add_data(self):
with self.session:
query = self.session.query(
AggregatedEnergy5min.hiererchy_type_id,
AggregatedEnergy5min.hiererchy_ref_id,
AggregatedEnergy5min.location_id,
AggregatedEnergy5min.site_id,
AggregatedEnergy5min.plant_id,
AggregatedEnergy5min.shop_id,
AggregatedEnergy5min.line_id,
AggregatedEnergy5min.equipment_id,
AggregatedEnergy5min.area_id,
AggregatedEnergy5min.station_id,
AggregatedEnergy5min.shift,
AggregatedEnergy5min.energy_type,
AggregatedEnergy5min.subenergy,
AggregatedEnergy5min.UOM,
AggregatedEnergy5min.usage_type,
AggregatedEnergy5min.timestamp,
AggregatedEnergy5min.value,
AggregatedEnergy5min.units_produced
).order_by(AggregatedEnergy5min.timestamp).all()
arr = []
for each in query:
temp = each.timestamp - datetime.timedelta(days=366)
arr.append(AggregatedEnergy5min(
hiererchy_type_id=each.hiererchy_type_id,
hiererchy_ref_id=each.hiererchy_ref_id,
location_id=each.location_id,
site_id=each.site_id,
plant_id=each.plant_id,
shop_id=each.shop_id,
line_id=each.line_id,
area_id=each.area_id,
station_id=each.station_id,
equipment_id=each.equipment_id,
shift=each.shift,
energy_type=each.energy_type,
subenergy=each.subenergy,
UOM=each.UOM,
value=each.value, # Assign the total aggregated energy value
timestamp=temp, # Timestamp when the data was aggregated
usage_type=each.usage_type,
units_produced = each.units_produced
))
for each in query:
temp = each.timestamp - datetime.timedelta(days=366+365)
arr.append(AggregatedEnergy5min(
hiererchy_type_id=each.hiererchy_type_id,
hiererchy_ref_id=each.hiererchy_ref_id,
location_id=each.location_id,
site_id=each.site_id,
plant_id=each.plant_id,
shop_id=each.shop_id,
line_id=each.line_id,
area_id=each.area_id,
station_id=each.station_id,
equipment_id=each.equipment_id,
shift=each.shift,
energy_type=each.energy_type,
subenergy=each.subenergy,
UOM=each.UOM,
value=each.value, # Assign the total aggregated energy value
timestamp=temp, # Timestamp when the data was aggregated
usage_type=each.usage_type,
units_produced = each.units_produced
))
for each in query:
temp = each.timestamp - datetime.timedelta(days=366+365+365)
arr.append(AggregatedEnergy5min(
hiererchy_type_id=each.hiererchy_type_id,
hiererchy_ref_id=each.hiererchy_ref_id,
location_id=each.location_id,
site_id=each.site_id,
plant_id=each.plant_id,
shop_id=each.shop_id,
line_id=each.line_id,
area_id=each.area_id,
station_id=each.station_id,
equipment_id=each.equipment_id,
shift=each.shift,
energy_type=each.energy_type,
subenergy=each.subenergy,
UOM=each.UOM,
value=each.value, # Assign the total aggregated energy value
timestamp=temp, # Timestamp when the data was aggregated
usage_type=each.usage_type,
units_produced = each.units_produced
))
for each in query:
temp = each.timestamp - datetime.timedelta(days=366+365+365+365)
arr.append(AggregatedEnergy5min(
hiererchy_type_id=each.hiererchy_type_id,
hiererchy_ref_id=each.hiererchy_ref_id,
location_id=each.location_id,
site_id=each.site_id,
plant_id=each.plant_id,
shop_id=each.shop_id,
line_id=each.line_id,
area_id=each.area_id,
station_id=each.station_id,
equipment_id=each.equipment_id,
shift=each.shift,
energy_type=each.energy_type,
subenergy=each.subenergy,
UOM=each.UOM,
value=each.value, # Assign the total aggregated energy value
timestamp=temp, # Timestamp when the data was aggregated
usage_type=each.usage_type,
units_produced = each.units_produced
))
for each in query:
temp = each.timestamp - datetime.timedelta(days=366+365+365+365+365)
arr.append(AggregatedEnergy5min(
hiererchy_type_id=each.hiererchy_type_id,
hiererchy_ref_id=each.hiererchy_ref_id,
location_id=each.location_id,
site_id=each.site_id,
plant_id=each.plant_id,
shop_id=each.shop_id,
line_id=each.line_id,
area_id=each.area_id,
station_id=each.station_id,
equipment_id=each.equipment_id,
shift=each.shift,
energy_type=each.energy_type,
subenergy=each.subenergy,
UOM=each.UOM,
value=each.value, # Assign the total aggregated energy value
timestamp=temp, # Timestamp when the data was aggregated
usage_type=each.usage_type,
units_produced = each.units_produced
))
self.bulk_insert(arr)
Editor is loading...
Leave a Comment