def prepare_moving_aggregate_query(
start_dt: str,
end_dt: str,
agg_size_in_days: int,
table_info: TableInfo,
monthly_table_name: str,
include_monthly: bool
) -> str:
# If include_monthly is False, just create an empty CTE for monthly_aggregates.
monthly_aggregates_cte = """
monthly_aggregates AS (
SELECT NULL as cust_id, NULL as token_idx, NULL as value_sum,
NULL as agg_days, NULL as source_name, NULL as end_observation
WHERE FALSE
)
""" if not include_monthly else f"""
monthly_aggregates AS (
SELECT
m.cust_id,
m.token_idx,
m.value_sum,
30 as agg_days, -- Assuming 30 days for monthly aggregate
'monthly_source_name' as source_name, -- Adjust source name as needed
d.interval_end_dt as end_observation
FROM
date_range d
JOIN
{monthly_table_name} m
ON m.end_observation = DATE_TRUNC(d.interval_end_dt, MONTH)
)
"""
# Combine everything into the main query
return f"""
WITH date_range AS (
SELECT (DATE_ADD(DATE('{start_dt}'), INTERVAL x DAY)) as interval_end_dt
FROM UNNEST(GENERATE_ARRAY(0, DATE_DIFF(DATE('{end_dt}'), DATE('{start_dt}'), DAY))) as x
),
daily_aggregates AS (
SELECT
t.cust_id,
t.token_idx,
SUM(t.value_sum) AS value_sum,
{agg_size_in_days} as agg_days,
'{table_info.source_name}' as source_name,
d.interval_end_dt as end_observation
FROM
date_range d
JOIN
{table_info.input_table_name} t
ON t.end_observation BETWEEN DATE_SUB(d.interval_end_dt, INTERVAL {agg_size_in_days - 1} DAY)
AND d.interval_end_dt
WHERE
t.end_observation > '2022-01-01' -- to satisfy need to use where clause on partitioned col
AND t.end_observation BETWEEN DATE_SUB('{start_dt}', INTERVAL {agg_size_in_days} DAY) AND '{end_dt}'
GROUP BY
d.interval_end_dt,
t.cust_id,
t.token_idx
),
{monthly_aggregates_cte} -- Use the conditioned CTE here
SELECT * FROM daily_aggregates
UNION ALL
SELECT * FROM monthly_aggregates
ORDER BY
end_observation,
cust_id,
token_idx
"""
# You'll call this function with the additional parameters for monthly_table_name and include_monthly.