Untitled
unknown
plain_text
a year ago
2.7 kB
2
Indexable
Never
def prepare_moving_aggregate_query( start_dt: str, end_dt: str, agg_size_in_days: int, table_info: TableInfo, monthly_table_name: str, include_monthly: bool ) -> str: # If include_monthly is False, just create an empty CTE for monthly_aggregates. monthly_aggregates_cte = """ monthly_aggregates AS ( SELECT NULL as cust_id, NULL as token_idx, NULL as value_sum, NULL as agg_days, NULL as source_name, NULL as end_observation WHERE FALSE ) """ if not include_monthly else f""" monthly_aggregates AS ( SELECT m.cust_id, m.token_idx, m.value_sum, 30 as agg_days, -- Assuming 30 days for monthly aggregate 'monthly_source_name' as source_name, -- Adjust source name as needed d.interval_end_dt as end_observation FROM date_range d JOIN {monthly_table_name} m ON m.end_observation = DATE_TRUNC(d.interval_end_dt, MONTH) ) """ # Combine everything into the main query return f""" WITH date_range AS ( SELECT (DATE_ADD(DATE('{start_dt}'), INTERVAL x DAY)) as interval_end_dt FROM UNNEST(GENERATE_ARRAY(0, DATE_DIFF(DATE('{end_dt}'), DATE('{start_dt}'), DAY))) as x ), daily_aggregates AS ( SELECT t.cust_id, t.token_idx, SUM(t.value_sum) AS value_sum, {agg_size_in_days} as agg_days, '{table_info.source_name}' as source_name, d.interval_end_dt as end_observation FROM date_range d JOIN {table_info.input_table_name} t ON t.end_observation BETWEEN DATE_SUB(d.interval_end_dt, INTERVAL {agg_size_in_days - 1} DAY) AND d.interval_end_dt WHERE t.end_observation > '2022-01-01' -- to satisfy need to use where clause on partitioned col AND t.end_observation BETWEEN DATE_SUB('{start_dt}', INTERVAL {agg_size_in_days} DAY) AND '{end_dt}' GROUP BY d.interval_end_dt, t.cust_id, t.token_idx ), {monthly_aggregates_cte} -- Use the conditioned CTE here SELECT * FROM daily_aggregates UNION ALL SELECT * FROM monthly_aggregates ORDER BY end_observation, cust_id, token_idx """ # You'll call this function with the additional parameters for monthly_table_name and include_monthly.